diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7bf83987..96382ea5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -73,6 +73,7 @@ jobs: run: | coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_data_registry coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_report_generation + coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_commands coverage report -m coveralls --service=github - name: Build Python package 🏗️ diff --git a/README.md b/README.md index 79c32165..7cda86d6 100644 --- a/README.md +++ b/README.md @@ -230,6 +230,11 @@ To generate emails for contributors: woudc-data-registry data generate-emails /path/to/dir ``` +#### Publishing Notifications to MQTT Server +```bash +woudc-data-registry data publish-notification --hours number_of_hours +``` + #### Delete Record ```bash diff --git a/default.env b/default.env index f396fac5..8cd50647 100644 --- a/default.env +++ b/default.env @@ -64,4 +64,10 @@ export WDR_FTP_PASS= export WDR_FTP_KEEP_FILES=True export WDR_FTP_BASEDIR_INCOMING='/' export WDR_ACKNOWLEDGE_SUBMISSION_HOURS=24 -export WDR_ACKNOWLEDGE_TEMPLATE_PATH=/path/to/etc/woudc-acknowledge-email.txt \ No newline at end of file +export WDR_ACKNOWLEDGE_TEMPLATE_PATH=/path/to/etc/woudc-acknowledge-email.txt + +# MQTT Broker configuration +export WDR_MQTT_BROKER_HOST=broker.woudc.org +export WDR_MQTT_BROKER_PORT=8883 +export WDR_MQTT_BROKER_USERNAME=everyone +export WDR_MQTT_BROKER_PASSWORD=everyone diff --git a/etc/wis2-notification-message-template.json b/etc/wis2-notification-message-template.json new file mode 100644 index 00000000..617a3e5b --- /dev/null +++ b/etc/wis2-notification-message-template.json @@ -0,0 +1,32 @@ +{ + "id": "uuid.uuid4()", + "conformsTo": [ + "http://wis.wmo.int/spec/wnm/1/conf/core" + ], + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [ + "x", + "y", + "z" + ] + }, + "properties": { + "pubtime": "data_record.publication_datetime", + "datetime": "data_record.timestamp_dateTdata_record.timestamp_timedata_record.timestamp_utcoffset", + "integrity": { + "method": "sha512", + "value": "A2KNxvks...S8qfSCw==" + }, + "data_id": "data_record.data_id", + "metadata_id": "urn:wmo:md:org-woudc:data_record.dataset_id" + }, + "links": [ + { + "href": "data_record.url", + "rel": "canonical", + "type": "text/csv" + } + ] +} diff --git a/requirements.txt b/requirements.txt index 9e5f40a9..7562d939 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ click elasticsearch ftputil jsonschema +paho-mqtt pyyaml rarfile requests diff --git a/woudc_data_registry/config.py b/woudc_data_registry/config.py index c5773580..9752774d 100644 --- a/woudc_data_registry/config.py +++ b/woudc_data_registry/config.py @@ -90,6 +90,12 @@ WDR_ACKNOWLEDGE_TEMPLATE_PATH = os.getenv('WDR_ACKNOWLEDGE_TEMPLATE_PATH') WDR_FTP_SKIP_DIRS_INCOMING = os.getenv('WDR_FTP_SKIP_DIRS_INCOMING') WDR_FTP_KEEP_FILES = str2bool(os.getenv('WDR_FTP_KEEP_FILES', 'True')) +WDR_NOTIFICATION_MESSAGE = os.getenv('WDR_NOTIFICATION_MESSAGE') +WDR_MQTT_BROKER_HOST = os.getenv('WDR_MQTT_BROKER_HOST') +WDR_MQTT_BROKER_PORT = os.getenv('WDR_MQTT_BROKER_PORT') +WDR_MQTT_BROKER_USERNAME = os.getenv('WDR_MQTT_BROKER_USERNAME') +WDR_MQTT_BROKER_PASSWORD = os.getenv('WDR_MQTT_BROKER_PASSWORD') +WDR_MQTT_CLIENT_ID = os.getenv('WDR_MQTT_CLIENT_ID') if not WDR_SEARCH_INDEX_BASENAME: msg = 'WDR_SEARCH_INDEX_BASENAME was not set. \ diff --git a/woudc_data_registry/controller.py b/woudc_data_registry/controller.py index fbcbf25a..2f461d4f 100644 --- a/woudc_data_registry/controller.py +++ b/woudc_data_registry/controller.py @@ -46,6 +46,7 @@ import click import logging import os +import datetime from pathlib import Path from woudc_extcsv import (ExtendedCSV, NonStandardDataError, @@ -54,7 +55,10 @@ from woudc_data_registry import config from woudc_data_registry.gather import gather as gather_ from woudc_data_registry.util import (is_text_file, read_file, - send_email, delete_file_from_record) + send_email, delete_file_from_record, + get_HTTP_HEAD_response, + publish_to_MQTT_Broker, + generate_geojson_payload) from woudc_data_registry.processing import Process @@ -368,9 +372,63 @@ def gather(ctx, path): LOGGER.info("Done Gathering files") +@click.command() +@click.pass_context +@click.option('--hours', type=int, required=True) +def publish_notification(ctx, hours): + """Publish a notification to WMO WIS2""" + today = datetime.datetime.now() + date = today - datetime.timedelta(hours=hours) + date = date.replace(minute=0, second=0, microsecond=0) + registry = Registry() + ingested_records = registry.query_by_field_range( + DataRecord, "published_datetime", date, today) + LOGGER.info(f"Found records sooner than {date}") + url_template = 'https://woudc.org/archive/Archive-NewFormat' + responses = {} + no_message = [] + for record in ingested_records: + instrument = record.instrument_id.split(':')[0].lower() + year = record.timestamp_date.year + dataset = ( + f'{record.content_category}_{record.content_level}_' + f'{record.content_form}' + ) + ingest_filepath = record.ingest_filepath + url = ( + f'{url_template}/{dataset}/stn{record.station_id}/' + f'{instrument}/{year}/{record.filename}' + ) + + LOGGER.info(f'Found {url}') + http_reponse = get_HTTP_HEAD_response(url) + if http_reponse == 200: + query = registry.query_distinct_by_fields( + DataRecord.ingest_filepath, DataRecord, { + "ingest_filepath": ingest_filepath}) + if len(query) == 1: + message = 'new record' + elif len(query) > 1: + message = 'update record' + responses[ingest_filepath] = { + 'record': record, + 'status_code': http_reponse, + 'message': message + } + else: + no_message.append(ingest_filepath) + LOGGER.debug(f'Responses: {responses}') + LOGGER.debug(f'No message: {no_message}') + notifications = generate_geojson_payload(responses) + LOGGER.debug('geoJSON Generated.') + for notification in notifications: + publish_to_MQTT_Broker(notification) + + data.add_command(ingest) data.add_command(verify) data.add_command(generate_emails, name='generate-emails') data.add_command(send_feedback, name='send-feedback') data.add_command(delete_record, name='delete-record') data.add_command(gather) +data.add_command(publish_notification, name='publish-notification') diff --git a/woudc_data_registry/product/ozonesonde/ozonesonde_generator.py b/woudc_data_registry/product/ozonesonde/ozonesonde_generator.py index ec082e3d..c14a6d00 100644 --- a/woudc_data_registry/product/ozonesonde/ozonesonde_generator.py +++ b/woudc_data_registry/product/ozonesonde/ozonesonde_generator.py @@ -295,7 +295,7 @@ def execute(path, bypass): LOGGER.info(f'Inserted {ozone_object}') success += 1 except Exception as err: - msg = (f'Unable to insert UV index {ipath}:' + msg = (f'Unable to insert Ozonesonde {ipath}:' f' {err}') LOGGER.error(msg) continue diff --git a/woudc_data_registry/product/totalozone/totalozone_generator.py b/woudc_data_registry/product/totalozone/totalozone_generator.py index 73e87689..ae5853ca 100644 --- a/woudc_data_registry/product/totalozone/totalozone_generator.py +++ b/woudc_data_registry/product/totalozone/totalozone_generator.py @@ -262,7 +262,7 @@ def execute(path, bypass): LOGGER.info(f'Inserted {ozone_object}') success += 1 except Exception as err: - msg = (f'Unable to insert UV index {ipath}:' + msg = (f'Unable to insert totalozone {ipath}:' f' {err}') LOGGER.error(msg) continue diff --git a/woudc_data_registry/registry.py b/woudc_data_registry/registry.py index c7b997e5..ae265547 100644 --- a/woudc_data_registry/registry.py +++ b/woudc_data_registry/registry.py @@ -260,6 +260,30 @@ def query_by_pattern(self, obj, by, pattern, case_insensitive=False): return self.session.query(obj).filter(condition).first() + def query_by_field_range(self, obj, by, start, end, + case_insensitive=False): + """ + query data by field range + :param obj: Object instance of the table to query in + :param by: Field name to be queried + :param start: Start of the range + :param end: End of the range + :param case_insensitive: `bool` of whether to query strings + case-insensitively + :returns: Query results + """ + + field = getattr(obj, by) + if case_insensitive: + LOGGER.debug(f'Querying for LOWER({field}) BETWEEN \ + LOWER({start}) AND LOWER({end})') + condition = func.lower(field).between(start.lower(), end.lower()) + else: + LOGGER.debug(f'Querying for {field} BETWEEN {start} AND {end}') + condition = field.between(start, end) + + return self.session.query(obj).filter(condition).all() + def query_multiple_fields(self, table, values, fields=None, case_insensitive=()): """ diff --git a/woudc_data_registry/tests/test_commands.py b/woudc_data_registry/tests/test_commands.py new file mode 100644 index 00000000..45877421 --- /dev/null +++ b/woudc_data_registry/tests/test_commands.py @@ -0,0 +1,376 @@ +# ================================================================= +# +# Terms and Conditions of Use +# +# Unless otherwise noted, computer program source code of this +# distribution # is covered under Crown Copyright, Government of +# Canada, and is distributed under the MIT License. +# +# The Canada wordmark and related graphics associated with this +# distribution are protected under trademark law and copyright law. +# No permission is granted to use them outside the parameters of +# the Government of Canada's corporate identity program. For +# more information, see +# http://www.tbs-sct.gc.ca/fip-pcim/index-eng.asp +# +# Copyright title to all 3rd party software distributed with this +# software is held by the respective copyright holders as noted in +# those files. Users are asked to read the 3rd Party Licenses +# referenced with those assets. +# +# Copyright (c) 2024 Government of Canada +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import unittest +import os +import subprocess +import uuid +from click.testing import CliRunner + + +from unittest.mock import patch, mock_open +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from woudc_data_registry.models import DataRecord +from woudc_data_registry import config +from woudc_data_registry.util import generate_geojson_payload +from woudc_data_registry.controller import gather, delete_record + +""" +You need to set up a test environment for your tests. So setup and populate a +database and directory with files that have been ingested. + +Change WDR_DB_NAME and WDR_SEARCH_INDEX for testing perposes. +""" + + +class TestBasicDeletion(unittest.TestCase): + """Test case for basic functionality of deleting a record.""" + + def test_01_file_deletion(self): + """Run bash commands and verify the outcome.""" + + # Bash commands to run + commands = [ + 'woudc-data-registry data ingest ' + './woudc_data_registry/tests/data/totalozone/' + 'totalozone-correct.csv', + 'rm ' + config.WDR_FILE_TRASH + '/totalozone-correct.csv' + ] + + runner = CliRunner() + + engine = create_engine(config.WDR_DATABASE_URL, + echo=config.WDR_DB_DEBUG) + Session = sessionmaker(bind=engine, expire_on_commit=False) + session = Session() + + filenames_OG = [ + file for file in os.listdir(config.WDR_FILE_TRASH) + if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) + ] + + file_count_OG = len(filenames_OG) + + result_OG = session.query(DataRecord.output_filepath).all() + result_list_OG = [row[0] for row in result_OG] + row_count_OG = len(result_list_OG) + print(result_list_OG) + + # Ingesting the File + subprocess.run(commands[0], shell=True, check=True) + + result = session.query(DataRecord.output_filepath).all() + result_list = [row[0] for row in result] + row_count = len(result_list) + + output_filepath = ( + config.WDR_WAF_BASEDIR + + '/Archive-NewFormat/TotalOzone_1.0_1/stn077/brewer/2010/' + 'totalozone-correct.csv' + ) + + self.assertEqual(row_count, row_count_OG + 1) + self.assertTrue(output_filepath in result_list) + + # Deleting the File + result = runner.invoke( + delete_record, + [ + config.WDR_WAF_BASEDIR + + '/Archive-NewFormat/TotalOzone_1.0_1/stn077/brewer/2010/' + 'totalozone-correct.csv' + ] + ) + + assert result.exit_code == 0 + + filenames_01 = [ + file for file in os.listdir(config.WDR_FILE_TRASH) + if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) + ] + file_count_01 = len(filenames_01) + + result2 = session.query(DataRecord.output_filepath).all() + result_list2 = [row[0] for row in result2] + row_count2 = len(result_list2) + + self.assertEqual(file_count_01, file_count_OG + 1) + self.assertEqual(row_count2, row_count_OG) + self.assertEqual(result_list2, result_list_OG) + self.assertFalse(commands[0].split('/')[-1] in result_list2) + + subprocess.run(commands[1], shell=True, check=True) + + session.close() + + def test_02_absent_file_deletion(self): + """ + Run bash commands and verify the outcome where the file + path does not exist. + """ + runner = CliRunner() + + # Deleting the File + result = runner.invoke( + delete_record, + [ + config.WDR_WAF_BASEDIR + + '/Archive-NewFormat/TotalOzone_1.0_1/stn077/brewer/2010/' + 'totalozone-correct.csv' + ] + ) + + # Check that it failed (non-zero exit code) + self.assertNotEqual(result.exit_code, 0) + self.assertIn("does not exist", result.output) + + def test_03_absent_file_DB_deletion(self): + """ + Run bash commands and verify the outcome where the file path + exists but the row does not. + """ + commands = [ + 'cp ./woudc_data_registry/tests/data/totalozone/' + 'totalozone-correct.csv ' + + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' + '/TotalOzone_1.0_1/stn077/brewer/2010', + 'rm ' + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' + '/TotalOzone_1.0_1/stn077/brewer/2010/totalozone-correct.csv' + ] + # Get information + engine = create_engine(config.WDR_DATABASE_URL, + echo=config.WDR_DB_DEBUG) + Session = sessionmaker(bind=engine, expire_on_commit=False) + session = Session() + + filenames_OG = [ + file for file in os.listdir(config.WDR_FILE_TRASH) + if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) + ] + file_count_OG = len(filenames_OG) + + result_OG = session.query(DataRecord.output_filepath).all() + result_list_OG = [row[0] for row in result_OG] + row_count_OG = len(result_list_OG) + + # Copy the file to the WAF so the path exists + # but the file is not in the DB + subprocess.run(commands[0], shell=True, check=True) + + filenames_01 = [ + file for file in os.listdir(config.WDR_FILE_TRASH) + if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) + ] + file_count_01 = len(filenames_OG) + + result_01 = session.query(DataRecord.output_filepath).all() + result_list_01 = [row[0] for row in result_01] + row_count_01 = len(result_list_01) + + self.assertEqual(filenames_OG, filenames_01) + self.assertEqual(file_count_OG, file_count_01) + + self.assertEqual(result_list_OG, result_list_01) + self.assertEqual(row_count_OG, row_count_01) + + subprocess.run(commands[1], shell=True, check=True) + + +class DummyRecord: + def __init__(self, x, y, z, published_datetime, timestamp_date, + timestamp_time, timestamp_utcoffset, data_record_id, + dataset_id, url): + self.x = x + self.y = y + self.z = z + self.published_datetime = published_datetime + self.timestamp_date = timestamp_date + self.timestamp_time = timestamp_time + self.timestamp_utcoffset = timestamp_utcoffset + self.data_record_id = data_record_id + self.dataset_id = dataset_id + self.url = url + + +class TestGenerateGeoJsonPayload(unittest.TestCase): + def setUp(self): + self.geojson_template = { + "geometry": {"type": "Point", "coordinates": []}, + "properties": {}, + "links": [{"href": ""}], + "id": None + } + self.info = { + "file1": { + "record": DummyRecord( + x=1, y=2, z=3, + published_datetime="2024-06-04T12:34:56Z", + timestamp_date="2024-06-04", + timestamp_time="12:00:00", + timestamp_utcoffset="+00:00", + data_record_id="DATA123", + dataset_id="DSID123", + url="http://example.com" + ), + "status_code": 200, + "message": "OK" + } + } + + @patch("woudc_data_registry.config.WDR_NOTIFICATION_MESSAGE", + "dummy_path.json") + @patch( + "woudc_data_registry.util.open", + new_callable=mock_open, + read_data=( + '{"geometry": {"type": "Point", "coordinates": []}, ' + '"properties": {}, "links": [{"href": ""}], "id": null}' + ) + ) + @patch("woudc_data_registry.util.json.load") + @patch("woudc_data_registry.util.uuid.uuid4") + def test_basic_payload(self, mock_uuid, mock_json_load, mock_file_open): + mock_json_load.return_value = self.geojson_template.copy() + mock_uuid.return_value = uuid.UUID("12345678123456781234567812345678") + + payload = generate_geojson_payload(self.info) + self.assertEqual(len(payload), 1) + notif = payload[0] + self.assertEqual(notif["geometry"]["coordinates"], [1, 2, 3]) + self.assertEqual( + notif["properties"]["pubtime"], "2024-06-04T12:34:56Z" + ) + self.assertEqual( + notif["properties"]["datetime"], "2024-06-04T12:00:00+00:00" + ) + self.assertEqual(notif["properties"]["data_id"], "DATA123") + self.assertEqual( + notif["properties"]["metadata_id"], "urn:wmo:md:org-woudc:DSID123" + ) + self.assertEqual(notif["links"][0]["href"], "http://example.com") + self.assertEqual( + str(notif["id"]), "12345678-1234-5678-1234-567812345678" + ) + + @patch("woudc_data_registry.util.LOGGER") + @patch("woudc_data_registry.util.uuid.uuid4") + @patch("woudc_data_registry.util.json.load") + @patch( + "woudc_data_registry.util.open", + new_callable=mock_open, + read_data='{}' + ) + @patch("woudc_data_registry.config", autospec=True) + def test_none_x_or_y(self, mock_config, mock_file_open, mock_json_load, + mock_uuid, mock_logger): + # Set x to None to trigger geometry=null and error log + record = DummyRecord( + x=None, y=2, z=3, + published_datetime="2024-06-04T12:34:56Z", + timestamp_date="2024-06-04", timestamp_time="12:00:00", + timestamp_utcoffset="+00:00", + data_record_id="DATA456", dataset_id="DSID456", + url="http://example.com/2" + ) + info = { + "file2": { + "record": record, + "status_code": 200, + "message": "OK" + } + } + mock_json_load.return_value = self.geojson_template.copy() + mock_uuid.return_value = uuid.UUID("12345678123456781234567812345678") + + payload = generate_geojson_payload(info) + notif = payload[0] + self.assertIsNone(notif["geometry"]) + # Optionally, check that LOGGER.error was called + mock_logger.error.assert_called_once_with('x or y is None') + + +class TestGathering(unittest.TestCase): + def testconnection(self): + """Test connection to the ftp account.""" + commands = [ + 'rm -r yyyy-mm-dd' + ] + + runner = CliRunner() + result = runner.invoke(gather, ['yyyy-mm-dd']) + + assert result.exit_code == 0 + + folders = [ + name for name in os.listdir('yyyy-mm-dd') + if os.path.isdir(os.path.join('yyyy-mm-dd', name)) + ] + + folder_files = { + folder: [ + file for file in os.listdir(os.path.join('yyyy-mm-dd', folder)) + if os.path.isfile(os.path.join('yyyy-mm-dd', folder, file)) + ] + for folder in os.listdir('yyyy-mm-dd') + if os.path.isdir(os.path.join('yyyy-mm-dd', folder)) + } + + shadoz_files = folder_files.get('shadoz', []) + + # Edit this accordingly + self.assertGreater(len(folders), 0, "No folders found in yyyy-mm-dd") + self.assertIn('shadoz', folders, + "shadoz folder not found in yyyy-mm-dd") + + self.assertGreater(len(shadoz_files), 0, "No files in yyyy-mm-dd") + self.assertEqual(len(shadoz_files), 1659) + + subprocess.run(commands[0], shell=True, check=True, + stdout=subprocess.PIPE, text=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/woudc_data_registry/tests/test_delete_record.py b/woudc_data_registry/tests/test_delete_record.py deleted file mode 100644 index 44968ce7..00000000 --- a/woudc_data_registry/tests/test_delete_record.py +++ /dev/null @@ -1,156 +0,0 @@ -import unittest -import os -import subprocess -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from woudc_data_registry.models import DataRecord -from woudc_data_registry import config - -""" -You need to set up a test environment for your tests. So setup and populate a -database and directory with files that have been ingested. - -Change WDR_DB_NAME and WDR_SEARCH_INDEX for testing perposes. -""" - - -class TestBasicDeletion(unittest.TestCase): - """Test case for basic functionality of deleting a record.""" - # I need to run 2 bash commands and then do some checks - - def test_01_file_deletion(self): - """Run bash commands and verify the outcome.""" - - # Bash commands to run - commands = [ - 'woudc-data-registry data ingest ' - './data/totalozone/totalozone-correct.csv', - 'woudc-data-registry data delete-record ' - + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' - '/TotalOzone_1.0_1/stn077/brewer/2010/totalozone-correct.csv', - 'rm ' + config.WDR_FILE_TRASH + '/totalozone-correct.csv' - ] - - engine = create_engine(config.WDR_DATABASE_URL, - echo=config.WDR_DB_DEBUG) - Session = sessionmaker(bind=engine, expire_on_commit=False) - session = Session() - - filenames_OG = [ - file for file in os.listdir(config.WDR_FILE_TRASH) - if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) - ] - - file_count_OG = len(filenames_OG) - - result_OG = session.query(DataRecord.output_filepath).all() - result_list_OG = [row[0] for row in result_OG] - row_count_OG = len(result_list_OG) - print(result_list_OG) - - # Ingesting the File - subprocess.run(commands[0], shell=True, check=True) - - result = session.query(DataRecord.output_filepath).all() - result_list = [row[0] for row in result] - row_count = len(result_list) - - self.assertEqual(row_count, row_count_OG + 1) - self.assertTrue(commands[1].split(' ')[-1] in result_list) - - # Deleting the File - subprocess.run(commands[1], shell=True, check=True) - - filenames_01 = [ - file for file in os.listdir(config.WDR_FILE_TRASH) - if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) - ] - file_count_01 = len(filenames_01) - - result2 = session.query(DataRecord.output_filepath).all() - result_list2 = [row[0] for row in result2] - row_count2 = len(result_list2) - - self.assertEqual(file_count_01, file_count_OG + 1) - self.assertEqual(row_count2, row_count_OG) - self.assertEqual(result_list2, result_list_OG) - self.assertFalse(commands[0].split('/')[-1] in result_list2) - - subprocess.run(commands[2], shell=True, check=True) - - session.close() - - def test_02_absent_file_deletion(self): - """ - Run bash commands and verify the outcome where the file - path does not exist. - """ - - # Bash commands to run - commands = [ - 'woudc-data-registry data delete-record ' - + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' - '/TotalOzone_1.0_1/stn077/brewer/2010/totalozone-correct.csv' - ] - - # Deleting the File - with self.assertRaises(subprocess.CalledProcessError) as context: - subprocess.run(commands[0], shell=True, check=True) - - # Optional: Verify the error message or exit code - self.assertEqual(context.exception.returncode, 2) - self.assertIn("woudc-data-registry", context.exception.cmd) - - def test_03_absent_file_DB_deletion(self): - """ - Run bash commands and verify the outcome where the file path - exists but the row does not. - """ - commands = [ - 'cp ./data/totalozone/totalozone-correct.csv ' - + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' - '/TotalOzone_1.0_1/stn077/brewer/2010', - 'rm ' + config.WDR_WAF_BASEDIR + '/Archive-NewFormat' - '/TotalOzone_1.0_1/stn077/brewer/2010/totalozone-correct.csv' - ] - # Get information - engine = create_engine(config.WDR_DATABASE_URL, - echo=config.WDR_DB_DEBUG) - Session = sessionmaker(bind=engine, expire_on_commit=False) - session = Session() - - filenames_OG = [ - file for file in os.listdir(config.WDR_FILE_TRASH) - if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) - ] - file_count_OG = len(filenames_OG) - - result_OG = session.query(DataRecord.output_filepath).all() - result_list_OG = [row[0] for row in result_OG] - row_count_OG = len(result_list_OG) - - # Copy the file to the WAF so the path exists - # but the file is not in the DB - subprocess.run(commands[0], shell=True, check=True) - - filenames_01 = [ - file for file in os.listdir(config.WDR_FILE_TRASH) - if os.path.isfile(os.path.join(config.WDR_FILE_TRASH, file)) - ] - file_count_01 = len(filenames_OG) - - result_01 = session.query(DataRecord.output_filepath).all() - result_list_01 = [row[0] for row in result_01] - row_count_01 = len(result_list_01) - - self.assertEqual(filenames_OG, filenames_01) - self.assertEqual(file_count_OG, file_count_01) - - self.assertEqual(result_list_OG, result_list_01) - self.assertEqual(row_count_OG, row_count_01) - - subprocess.run(commands[1], shell=True, check=True) - - -if __name__ == '__main__': - unittest.main() diff --git a/woudc_data_registry/util.py b/woudc_data_registry/util.py index f37f17bc..9879f6b7 100644 --- a/woudc_data_registry/util.py +++ b/woudc_data_registry/util.py @@ -48,7 +48,9 @@ import io import os import shutil +import ssl import smtplib +import uuid import tarfile import rarfile @@ -56,8 +58,11 @@ import ftputil +import requests +import json from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +import paho.mqtt.client as mqtt from woudc_data_registry.registry import Registry import woudc_data_registry.config as config @@ -240,6 +245,114 @@ def decompress(ipath): return file_list +def get_HTTP_HEAD_response(url): + """ + Get HTTP HEAD response + + :param url: URL to be evaluated + :returns: `requests.Response` object + """ + + try: + response = requests.head(url) + response.raise_for_status() + LOGGER.info(f"HTTP Headers: {response}") + LOGGER.info(f"Status Code: {response.status_code}") + return response.status_code + except requests.exceptions.RequestException as e: + LOGGER.info(f"Status Code: {response.status_code}") + LOGGER.error(f"An error occurred while making a request to {url}: {e}") + return '404' + + +def generate_geojson_payload(info): + """ + Generate GeoJSON payload + + :returns: `dict` of GeoJSON payload + """ + notifications = [] + for key in info: + with open(config.WDR_NOTIFICATION_MESSAGE, 'r') as file: + geojson = json.load(file) + x = info[key]["record"].x + y = info[key]["record"].y + z = getattr(info[key]["record"], "z", None) + if x is None or y is None: + LOGGER.error('x or y is None') + geojson["geometry"] = None + else: + geojson["geometry"]["coordinates"] = [x, y, z] + geojson["properties"]["pubtime"] = ( + f"{info[key]['record'].published_datetime}" + ) + geojson["properties"]["datetime"] = ( + f"{info[key]['record'].timestamp_date}T" + f"{info[key]['record'].timestamp_time}" + f"{info[key]['record'].timestamp_utcoffset}" + ) + geojson["properties"]["data_id"] = info[key]["record"].data_record_id + geojson["properties"]["metadata_id"] = ( + f"urn:wmo:md:org-woudc:{info[key]['record'].dataset_id}" + ) + geojson["links"][0]["href"] = info[key]["record"].url + geojson["id"] = str(uuid.uuid4()) + notifications.append(geojson) + return notifications + + +def publish_to_MQTT_Broker(info): + """ + Publish to MQTT Broker + + :param Info: `dict` of information to be published + :returns: `bool` of whether the publish was successful + """ + try: + client = mqtt.Client( + client_id=config.WDR_MQTT_CLIENT_ID, + clean_session=True + ) + + client.username_pw_set( + config.WDR_MQTT_BROKER_USERNAME, + config.WDR_MQTT_BROKER_PASSWORD + ) + client.tls_set( + ca_certs="/etc/ssl/certs/ca-certificates.crt", + certfile=None, + keyfile=None, + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLS, + ciphers=None + ) + + client.tls_insecure_set(False) + + client.connect( + config.WDR_MQTT_BROKER_HOST, + int(config.WDR_MQTT_BROKER_PORT), + 60 + ) + client.loop_start() + + href = info.get('links')[0].get('href') + path_extension = '/'.join(href.split('/')[5:]) + mqtt_path = "origin/a/wis2/org-woudc/data/core/atmospheric-composition" + # Send the payload (uncomment this for real data): + payload = json.dumps(info) + + client.publish(f"{mqtt_path}/{path_extension}", payload) + + client.loop_stop() + client.disconnect() + + LOGGER.info("MQTT publish successful") + + except Exception as e: + LOGGER.error(f"MQTT error: {e}") + + def send_email(message, subject, from_email_address, to_email_addresses, host, port, cc_addresses=None, bcc_addresses=None, secure=False, from_email_password=None):