Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
run: |
coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_data_registry
coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_report_generation
coverage run -a --source=woudc_data_registry -m unittest woudc_data_registry.tests.test_commands
coverage report -m
coveralls --service=github
- name: Build Python package 🏗️
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ To generate emails for contributors:
woudc-data-registry data generate-emails /path/to/dir
```

#### Publishing Notifications to MQTT Server
```bash
woudc-data-registry data publish-notification --hours number_of_hours
```

#### Delete Record

```bash
Expand Down
8 changes: 7 additions & 1 deletion default.env
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ export WDR_FTP_PASS=<secret>
export WDR_FTP_KEEP_FILES=True
export WDR_FTP_BASEDIR_INCOMING='/'
export WDR_ACKNOWLEDGE_SUBMISSION_HOURS=24
export WDR_ACKNOWLEDGE_TEMPLATE_PATH=/path/to/etc/woudc-acknowledge-email.txt
export WDR_ACKNOWLEDGE_TEMPLATE_PATH=/path/to/etc/woudc-acknowledge-email.txt

# MQTT Broker configuration
export WDR_MQTT_BROKER_HOST=broker.woudc.org
export WDR_MQTT_BROKER_PORT=8883
export WDR_MQTT_BROKER_USERNAME=everyone
export WDR_MQTT_BROKER_PASSWORD=everyone
32 changes: 32 additions & 0 deletions etc/wis2-notification-message-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"id": "uuid.uuid4()",
"conformsTo": [
"http://wis.wmo.int/spec/wnm/1/conf/core"
],
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [
"x",
"y",
"z"
]
},
"properties": {
"pubtime": "data_record.publication_datetime",
"datetime": "data_record.timestamp_dateTdata_record.timestamp_timedata_record.timestamp_utcoffset",
"integrity": {
"method": "sha512",
"value": "A2KNxvks...S8qfSCw=="
},
"data_id": "data_record.data_id",
"metadata_id": "urn:wmo:md:org-woudc:data_record.dataset_id"
},
"links": [
{
"href": "data_record.url",
"rel": "canonical",
"type": "text/csv"
}
]
}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ click
elasticsearch
ftputil
jsonschema
paho-mqtt
pyyaml
rarfile
requests
Expand Down
6 changes: 6 additions & 0 deletions woudc_data_registry/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
WDR_ACKNOWLEDGE_TEMPLATE_PATH = os.getenv('WDR_ACKNOWLEDGE_TEMPLATE_PATH')
WDR_FTP_SKIP_DIRS_INCOMING = os.getenv('WDR_FTP_SKIP_DIRS_INCOMING')
WDR_FTP_KEEP_FILES = str2bool(os.getenv('WDR_FTP_KEEP_FILES', 'True'))
WDR_NOTIFICATION_MESSAGE = os.getenv('WDR_NOTIFICATION_MESSAGE')
WDR_MQTT_BROKER_HOST = os.getenv('WDR_MQTT_BROKER_HOST')
WDR_MQTT_BROKER_PORT = os.getenv('WDR_MQTT_BROKER_PORT')
WDR_MQTT_BROKER_USERNAME = os.getenv('WDR_MQTT_BROKER_USERNAME')
WDR_MQTT_BROKER_PASSWORD = os.getenv('WDR_MQTT_BROKER_PASSWORD')
WDR_MQTT_CLIENT_ID = os.getenv('WDR_MQTT_CLIENT_ID')

if not WDR_SEARCH_INDEX_BASENAME:
msg = 'WDR_SEARCH_INDEX_BASENAME was not set. \
Expand Down
60 changes: 59 additions & 1 deletion woudc_data_registry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import click
import logging
import os
import datetime

from pathlib import Path
from woudc_extcsv import (ExtendedCSV, NonStandardDataError,
Expand All @@ -54,7 +55,10 @@
from woudc_data_registry import config
from woudc_data_registry.gather import gather as gather_
from woudc_data_registry.util import (is_text_file, read_file,
send_email, delete_file_from_record)
send_email, delete_file_from_record,
get_HTTP_HEAD_response,
publish_to_MQTT_Broker,
generate_geojson_payload)


from woudc_data_registry.processing import Process
Expand Down Expand Up @@ -368,9 +372,63 @@ def gather(ctx, path):
LOGGER.info("Done Gathering files")


@click.command()
@click.pass_context
@click.option('--hours', type=int, required=True)
def publish_notification(ctx, hours):
"""Publish a notification to WMO WIS2"""
today = datetime.datetime.now()
date = today - datetime.timedelta(hours=hours)
date = date.replace(minute=0, second=0, microsecond=0)
registry = Registry()
ingested_records = registry.query_by_field_range(
DataRecord, "published_datetime", date, today)
LOGGER.info(f"Found records sooner than {date}")
url_template = 'https://woudc.org/archive/Archive-NewFormat'
responses = {}
no_message = []
for record in ingested_records:
instrument = record.instrument_id.split(':')[0].lower()
year = record.timestamp_date.year
dataset = (
f'{record.content_category}_{record.content_level}_'
f'{record.content_form}'
)
ingest_filepath = record.ingest_filepath
url = (
f'{url_template}/{dataset}/stn{record.station_id}/'
f'{instrument}/{year}/{record.filename}'
)

LOGGER.info(f'Found {url}')
http_reponse = get_HTTP_HEAD_response(url)
if http_reponse == 200:
query = registry.query_distinct_by_fields(
DataRecord.ingest_filepath, DataRecord, {
"ingest_filepath": ingest_filepath})
if len(query) == 1:
message = 'new record'
elif len(query) > 1:
message = 'update record'
responses[ingest_filepath] = {
'record': record,
'status_code': http_reponse,
'message': message
}
else:
no_message.append(ingest_filepath)
LOGGER.debug(f'Responses: {responses}')
LOGGER.debug(f'No message: {no_message}')
notifications = generate_geojson_payload(responses)
LOGGER.debug('geoJSON Generated.')
for notification in notifications:
publish_to_MQTT_Broker(notification)


data.add_command(ingest)
data.add_command(verify)
data.add_command(generate_emails, name='generate-emails')
data.add_command(send_feedback, name='send-feedback')
data.add_command(delete_record, name='delete-record')
data.add_command(gather)
data.add_command(publish_notification, name='publish-notification')
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def execute(path, bypass):
LOGGER.info(f'Inserted {ozone_object}')
success += 1
except Exception as err:
msg = (f'Unable to insert UV index {ipath}:'
msg = (f'Unable to insert Ozonesonde {ipath}:'
f' {err}')
LOGGER.error(msg)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def execute(path, bypass):
LOGGER.info(f'Inserted {ozone_object}')
success += 1
except Exception as err:
msg = (f'Unable to insert UV index {ipath}:'
msg = (f'Unable to insert totalozone {ipath}:'
f' {err}')
LOGGER.error(msg)
continue
Expand Down
24 changes: 24 additions & 0 deletions woudc_data_registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,30 @@ def query_by_pattern(self, obj, by, pattern, case_insensitive=False):

return self.session.query(obj).filter(condition).first()

def query_by_field_range(self, obj, by, start, end,
case_insensitive=False):
"""
query data by field range
:param obj: Object instance of the table to query in
:param by: Field name to be queried
:param start: Start of the range
:param end: End of the range
:param case_insensitive: `bool` of whether to query strings
case-insensitively
:returns: Query results
"""

field = getattr(obj, by)
if case_insensitive:
LOGGER.debug(f'Querying for LOWER({field}) BETWEEN \
LOWER({start}) AND LOWER({end})')
condition = func.lower(field).between(start.lower(), end.lower())
else:
LOGGER.debug(f'Querying for {field} BETWEEN {start} AND {end}')
condition = field.between(start, end)

return self.session.query(obj).filter(condition).all()

def query_multiple_fields(self, table, values, fields=None,
case_insensitive=()):
"""
Expand Down
Loading
Loading