diff --git a/shared_code/glow.py b/shared_code/glow.py index 60218e2..cbb9bb4 100644 --- a/shared_code/glow.py +++ b/shared_code/glow.py @@ -1,5 +1,5 @@ import json -from typing import Any, List +from typing import Any, List, Optional from azure.functions import EventHubEvent @@ -7,38 +7,32 @@ from .helpers import to_datetime, create_correlation_id -def glow_to_timescale( - event: EventHubEvent, - messagebody: dict, - topic: str, - publisher: str, -) -> List[dict[str, Any]]: - """ - Convert a message from the Glow MQTT broker to a list of records for TimescaleDB - @param event: the eventhub event - @param messagebody: the message body - @param topic: the topic - @param publisher: the publisher - @return: a list of timescale records - """ - # examine the topic. We're only interested in topics where the last part is in events_of_interest - events_of_interest = ["electricitymeter", "gasmeter"] - topic_parts = topic.split("/") +def validate_publisher_and_topic(publisher: str, topic: str) -> Optional[str]: if publisher.lower() != "glow": raise ValueError( f"Invalid publisher: Glow processor only handles Glow messages, not {publisher}" ) + topic_parts = topic.split("/") measurement_subject = topic_parts[-1] - if measurement_subject not in events_of_interest: - return + if measurement_subject not in ["electricitymeter", "gasmeter"]: + return None + return measurement_subject + - # convert the message to a json object +def parse_message_payload(messagebody: dict, measurement_subject: str) -> tuple: message_payload = json.loads(messagebody["payload"]) timestamp = to_datetime(message_payload[measurement_subject]["timestamp"]) - correlation_id = create_correlation_id(event) - # for these messages, we need to construct an array of records, one for each value - records = [] - # ignore text fields which we dont care about: + return message_payload, timestamp + + +def create_records_for_subject( + message_payload: dict, + timestamp: str, + correlation_id: str, + publisher: str, + measurement_subject: str, + records: List[dict], +) -> List[dict]: ignore_keys = [ "units", "mpan", @@ -47,6 +41,8 @@ def glow_to_timescale( "dayweekmonthvolunits", "cumulativevolunits", ] + if measurement_subject not in message_payload: + return [] records = create_record_recursive( payload=message_payload[measurement_subject]["energy"]["import"], records=records, @@ -57,7 +53,6 @@ def glow_to_timescale( ignore_keys=ignore_keys, measurement_of_prefix="import", ) - if measurement_subject == "electricitymeter": records = create_record_recursive( payload=message_payload[measurement_subject]["power"], @@ -69,5 +64,30 @@ def glow_to_timescale( ignore_keys=ignore_keys, measurement_of_prefix="power", ) + return records + + +def glow_to_timescale( + event: EventHubEvent, + messagebody: dict, + topic: str, + publisher: str, +) -> List[dict[str, Any]]: + measurement_subject = validate_publisher_and_topic(publisher, topic) + if measurement_subject is None: + return + + message_payload, timestamp = parse_message_payload(messagebody, measurement_subject) + correlation_id = create_correlation_id(event) + + records = [] + records = create_records_for_subject( + message_payload, + timestamp, + correlation_id, + publisher, + measurement_subject, + records, + ) return records diff --git a/test/test_glow.py b/test/test_glow.py new file mode 100644 index 0000000..ee81eef --- /dev/null +++ b/test/test_glow.py @@ -0,0 +1,156 @@ +import pytest +import json +from shared_code.glow import validate_publisher_and_topic, parse_message_payload, create_records_for_subject + + +class TestValidatePublisherAndTopic: + def test_valid_publisher_and_topic(self): + publisher = 'Glow' + topic = 'some/valid/electricitymeter' + assert validate_publisher_and_topic(publisher, topic) == 'electricitymeter' + + def test_valid_publisher_case_insensitive_and_topic(self): + publisher = 'gLoW' + topic = 'some/valid/gasmeter' + assert validate_publisher_and_topic(publisher, topic) == 'gasmeter' + + def test_invalid_publisher(self): + publisher = 'NotGlow' + topic = 'some/valid/electricitymeter' + with pytest.raises(ValueError) as e: + validate_publisher_and_topic(publisher, topic) + assert str(e.value) == 'Invalid publisher: Glow processor only handles Glow messages, not NotGlow' + + def test_invalid_topic(self): + publisher = 'Glow' + topic = 'some/invalid/invalidsubject' + assert validate_publisher_and_topic(publisher, topic) is None + + def test_missing_measurement_subject(self): + publisher = 'Glow' + topic = 'some/invalid/' + assert validate_publisher_and_topic(publisher, topic) is None + + def test_valid_publisher_and_topic_but_not_of_interest(self): + publisher = 'Glow' + topic = 'some/valid/notofinterest' + assert validate_publisher_and_topic(publisher, topic) is None + + +class TestParseMessagePayload: + + def test_valid_messagebody_and_subject(self): + messagebody = {'payload': json.dumps({'electricitymeter': {'timestamp': '2022-01-01T12:34:56Z'}})} + measurement_subject = 'electricitymeter' + expected_payload = {'electricitymeter': {'timestamp': '2022-01-01T12:34:56Z'}} + expected_timestamp = '2022-01-01T12:34:56.000000Z' # Replace with the actual datetime object if to_datetime converts it + payload, timestamp = parse_message_payload(messagebody, measurement_subject) + assert payload == expected_payload + assert timestamp == expected_timestamp + + def test_missing_payload(self): + messagebody = {} + measurement_subject = 'electricitymeter' + with pytest.raises(KeyError): + parse_message_payload(messagebody, measurement_subject) + + def test_missing_measurement_subject_in_payload(self): + messagebody = {'payload': json.dumps({'gasmeter': {'timestamp': '2022-01-01T12:34:56Z'}})} + measurement_subject = 'electricitymeter' + with pytest.raises(KeyError): + parse_message_payload(messagebody, measurement_subject) + + def test_missing_timestamp_in_subject(self): + messagebody = {'payload': json.dumps({'electricitymeter': {}})} + measurement_subject = 'electricitymeter' + with pytest.raises(KeyError): + parse_message_payload(messagebody, measurement_subject) + +class TestCreateRecordsForSubject: + + def test_all_valid_parameters(self): + message_payload = { + 'electricitymeter': { + 'energy': {'import': {'value': 100}}, + 'power': {'value': 200} + } + } + timestamp = '2022-01-01T12:34:56Z' + correlation_id = 'some_id' + publisher = 'Glow' + measurement_subject = 'electricitymeter' + records = [] + expected_records = [ + {'timestamp': timestamp, 'correlation_id': correlation_id, 'measurement_publisher': publisher, + 'measurement_subject': measurement_subject, 'measurement_of': 'import_value', 'measurement_value': 100, + 'measurement_data_type': 'number'}, + {'timestamp': timestamp, 'correlation_id': correlation_id, 'measurement_publisher': publisher, + 'measurement_subject': measurement_subject, 'measurement_of': 'power_value', 'measurement_value': 200, + 'measurement_data_type': 'number'} + ] + + output_records = create_records_for_subject( + message_payload, timestamp, correlation_id, publisher, measurement_subject, records) + assert output_records == expected_records + + def test_missing_measurement_subject_in_payload(self): + message_payload = {} + timestamp = '2022-01-01T12:34:56Z' + correlation_id = 'some_id' + publisher = 'Glow' + measurement_subject = 'electricitymeter' + records = [] + expected_records = [] + output_records = create_records_for_subject( + message_payload, timestamp, correlation_id, publisher, measurement_subject, records) + assert output_records == expected_records + + def test_ignore_keys(self): + message_payload = { + 'electricitymeter': { + 'energy': {'import': {'value': 100, 'units': 'kW'}}, + 'power': {'value': 200} + } + } + timestamp = '2022-01-01T12:34:56Z' + correlation_id = 'some_id' + publisher = 'Glow' + measurement_subject = 'electricitymeter' + records = [] + expected_records = [ + {'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id', + 'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter', + 'measurement_of': 'import_value', 'measurement_value': 100, 'measurement_data_type': 'number'}, + {'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id', + 'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter', + 'measurement_of': 'power_value', 'measurement_value': 200, 'measurement_data_type': 'number'} + ] + + output_records = create_records_for_subject( + message_payload, timestamp, correlation_id, publisher, measurement_subject, records) + assert output_records == expected_records + + def test_electricitymeter_with_both_import_and_power(self): + message_payload = { + 'electricitymeter': { + 'energy': {'import': {'value': 100}}, + 'power': {'value': 200} + } + } + timestamp = '2022-01-01T12:34:56Z' + correlation_id = 'some_id' + publisher = 'Glow' + measurement_subject = 'electricitymeter' + records = [] + expected_records = [ + {'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id', + 'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter', + 'measurement_of': 'import_value', 'measurement_value': 100, 'measurement_data_type': 'number'}, + {'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id', + 'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter', + 'measurement_of': 'power_value', 'measurement_value': 200, 'measurement_data_type': 'number'} + ] + + output_records = create_records_for_subject( + message_payload, timestamp, correlation_id, publisher, measurement_subject, records) + assert output_records == expected_records \ No newline at end of file