Skip to content

Commit

Permalink
refactor glow.py (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
mnbf9rca committed Oct 22, 2023
1 parent 219150a commit b20505b
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 26 deletions.
72 changes: 46 additions & 26 deletions shared_code/glow.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
import json
from typing import Any, List
from typing import Any, List, Optional

from azure.functions import EventHubEvent

from .timeseries import create_record_recursive
from .helpers import to_datetime, create_correlation_id


def glow_to_timescale(
event: EventHubEvent,
messagebody: dict,
topic: str,
publisher: str,
) -> List[dict[str, Any]]:
"""
Convert a message from the Glow MQTT broker to a list of records for TimescaleDB
@param event: the eventhub event
@param messagebody: the message body
@param topic: the topic
@param publisher: the publisher
@return: a list of timescale records
"""
# examine the topic. We're only interested in topics where the last part is in events_of_interest
events_of_interest = ["electricitymeter", "gasmeter"]
topic_parts = topic.split("/")
def validate_publisher_and_topic(publisher: str, topic: str) -> Optional[str]:
if publisher.lower() != "glow":
raise ValueError(
f"Invalid publisher: Glow processor only handles Glow messages, not {publisher}"
)
topic_parts = topic.split("/")
measurement_subject = topic_parts[-1]
if measurement_subject not in events_of_interest:
return
if measurement_subject not in ["electricitymeter", "gasmeter"]:
return None
return measurement_subject


# convert the message to a json object
def parse_message_payload(messagebody: dict, measurement_subject: str) -> tuple:
message_payload = json.loads(messagebody["payload"])
timestamp = to_datetime(message_payload[measurement_subject]["timestamp"])
correlation_id = create_correlation_id(event)
# for these messages, we need to construct an array of records, one for each value
records = []
# ignore text fields which we dont care about:
return message_payload, timestamp


def create_records_for_subject(
message_payload: dict,
timestamp: str,
correlation_id: str,
publisher: str,
measurement_subject: str,
records: List[dict],
) -> List[dict]:
ignore_keys = [
"units",
"mpan",
Expand All @@ -47,6 +41,8 @@ def glow_to_timescale(
"dayweekmonthvolunits",
"cumulativevolunits",
]
if measurement_subject not in message_payload:
return []
records = create_record_recursive(
payload=message_payload[measurement_subject]["energy"]["import"],
records=records,
Expand All @@ -57,7 +53,6 @@ def glow_to_timescale(
ignore_keys=ignore_keys,
measurement_of_prefix="import",
)

if measurement_subject == "electricitymeter":
records = create_record_recursive(
payload=message_payload[measurement_subject]["power"],
Expand All @@ -69,5 +64,30 @@ def glow_to_timescale(
ignore_keys=ignore_keys,
measurement_of_prefix="power",
)
return records


def glow_to_timescale(
event: EventHubEvent,
messagebody: dict,
topic: str,
publisher: str,
) -> List[dict[str, Any]]:
measurement_subject = validate_publisher_and_topic(publisher, topic)
if measurement_subject is None:
return

message_payload, timestamp = parse_message_payload(messagebody, measurement_subject)
correlation_id = create_correlation_id(event)

records = []
records = create_records_for_subject(
message_payload,
timestamp,
correlation_id,
publisher,
measurement_subject,
records,
)

return records
156 changes: 156 additions & 0 deletions test/test_glow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import pytest
import json
from shared_code.glow import validate_publisher_and_topic, parse_message_payload, create_records_for_subject


class TestValidatePublisherAndTopic:
def test_valid_publisher_and_topic(self):
publisher = 'Glow'
topic = 'some/valid/electricitymeter'
assert validate_publisher_and_topic(publisher, topic) == 'electricitymeter'

def test_valid_publisher_case_insensitive_and_topic(self):
publisher = 'gLoW'
topic = 'some/valid/gasmeter'
assert validate_publisher_and_topic(publisher, topic) == 'gasmeter'

def test_invalid_publisher(self):
publisher = 'NotGlow'
topic = 'some/valid/electricitymeter'
with pytest.raises(ValueError) as e:
validate_publisher_and_topic(publisher, topic)
assert str(e.value) == 'Invalid publisher: Glow processor only handles Glow messages, not NotGlow'

def test_invalid_topic(self):
publisher = 'Glow'
topic = 'some/invalid/invalidsubject'
assert validate_publisher_and_topic(publisher, topic) is None

def test_missing_measurement_subject(self):
publisher = 'Glow'
topic = 'some/invalid/'
assert validate_publisher_and_topic(publisher, topic) is None

def test_valid_publisher_and_topic_but_not_of_interest(self):
publisher = 'Glow'
topic = 'some/valid/notofinterest'
assert validate_publisher_and_topic(publisher, topic) is None


class TestParseMessagePayload:

def test_valid_messagebody_and_subject(self):
messagebody = {'payload': json.dumps({'electricitymeter': {'timestamp': '2022-01-01T12:34:56Z'}})}
measurement_subject = 'electricitymeter'
expected_payload = {'electricitymeter': {'timestamp': '2022-01-01T12:34:56Z'}}
expected_timestamp = '2022-01-01T12:34:56.000000Z' # Replace with the actual datetime object if to_datetime converts it
payload, timestamp = parse_message_payload(messagebody, measurement_subject)
assert payload == expected_payload
assert timestamp == expected_timestamp

def test_missing_payload(self):
messagebody = {}
measurement_subject = 'electricitymeter'
with pytest.raises(KeyError):
parse_message_payload(messagebody, measurement_subject)

def test_missing_measurement_subject_in_payload(self):
messagebody = {'payload': json.dumps({'gasmeter': {'timestamp': '2022-01-01T12:34:56Z'}})}
measurement_subject = 'electricitymeter'
with pytest.raises(KeyError):
parse_message_payload(messagebody, measurement_subject)

def test_missing_timestamp_in_subject(self):
messagebody = {'payload': json.dumps({'electricitymeter': {}})}
measurement_subject = 'electricitymeter'
with pytest.raises(KeyError):
parse_message_payload(messagebody, measurement_subject)

class TestCreateRecordsForSubject:

def test_all_valid_parameters(self):
message_payload = {
'electricitymeter': {
'energy': {'import': {'value': 100}},
'power': {'value': 200}
}
}
timestamp = '2022-01-01T12:34:56Z'
correlation_id = 'some_id'
publisher = 'Glow'
measurement_subject = 'electricitymeter'
records = []
expected_records = [
{'timestamp': timestamp, 'correlation_id': correlation_id, 'measurement_publisher': publisher,
'measurement_subject': measurement_subject, 'measurement_of': 'import_value', 'measurement_value': 100,
'measurement_data_type': 'number'},
{'timestamp': timestamp, 'correlation_id': correlation_id, 'measurement_publisher': publisher,
'measurement_subject': measurement_subject, 'measurement_of': 'power_value', 'measurement_value': 200,
'measurement_data_type': 'number'}
]

output_records = create_records_for_subject(
message_payload, timestamp, correlation_id, publisher, measurement_subject, records)
assert output_records == expected_records

def test_missing_measurement_subject_in_payload(self):
message_payload = {}
timestamp = '2022-01-01T12:34:56Z'
correlation_id = 'some_id'
publisher = 'Glow'
measurement_subject = 'electricitymeter'
records = []
expected_records = []
output_records = create_records_for_subject(
message_payload, timestamp, correlation_id, publisher, measurement_subject, records)
assert output_records == expected_records

def test_ignore_keys(self):
message_payload = {
'electricitymeter': {
'energy': {'import': {'value': 100, 'units': 'kW'}},
'power': {'value': 200}
}
}
timestamp = '2022-01-01T12:34:56Z'
correlation_id = 'some_id'
publisher = 'Glow'
measurement_subject = 'electricitymeter'
records = []
expected_records = [
{'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id',
'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter',
'measurement_of': 'import_value', 'measurement_value': 100, 'measurement_data_type': 'number'},
{'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id',
'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter',
'measurement_of': 'power_value', 'measurement_value': 200, 'measurement_data_type': 'number'}
]

output_records = create_records_for_subject(
message_payload, timestamp, correlation_id, publisher, measurement_subject, records)
assert output_records == expected_records

def test_electricitymeter_with_both_import_and_power(self):
message_payload = {
'electricitymeter': {
'energy': {'import': {'value': 100}},
'power': {'value': 200}
}
}
timestamp = '2022-01-01T12:34:56Z'
correlation_id = 'some_id'
publisher = 'Glow'
measurement_subject = 'electricitymeter'
records = []
expected_records = [
{'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id',
'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter',
'measurement_of': 'import_value', 'measurement_value': 100, 'measurement_data_type': 'number'},
{'timestamp': '2022-01-01T12:34:56Z', 'correlation_id': 'some_id',
'measurement_publisher': 'Glow', 'measurement_subject': 'electricitymeter',
'measurement_of': 'power_value', 'measurement_value': 200, 'measurement_data_type': 'number'}
]

output_records = create_records_for_subject(
message_payload, timestamp, correlation_id, publisher, measurement_subject, records)
assert output_records == expected_records

0 comments on commit b20505b

Please sign in to comment.