From 31842111bc17ea01b8e8eb0e40205d39f7c9662a Mon Sep 17 00:00:00 2001 From: "S. Co1" Date: Mon, 7 Aug 2023 12:11:30 -0400 Subject: [PATCH] Add initial parsing tests --- dropmate_py/parser.py | 40 +++++++--- tests/test_faux_series.py | 28 +++++++ tests/test_log_objects.py | 154 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+), 11 deletions(-) create mode 100644 tests/test_faux_series.py create mode 100644 tests/test_log_objects.py diff --git a/dropmate_py/parser.py b/dropmate_py/parser.py index a46a723..251b433 100644 --- a/dropmate_py/parser.py +++ b/dropmate_py/parser.py @@ -73,7 +73,11 @@ class Health(str, Enum): # noqa: D101 @dataclass class FauxSeries: - """Helper container to support column access by name of a raw log line.""" + """ + Helper container to support column access by name of a raw log line. + + NOTE: `raw_columns` is assumed to already be split. + """ raw_columns: abc.Sequence[str] indices: ColumnIndices @@ -84,7 +88,8 @@ def __getitem__(self, key: str) -> str: raise KeyError(f"Column {key} not present in log file.") val = self.raw_columns[idx] - if not isinstance(val, str): + if not isinstance(val, str): # pragma: no cover + # Shouldn't ever get here but add the guard just in case raise ValueError("Provided log data contains non-string value(s).") return val @@ -112,7 +117,9 @@ class DropRecord: def __eq__(self, other: t.Any) -> bool: if not isinstance(other, DropRecord): - return NotImplemented + raise NotImplementedError( + f"Can only compare between {type(self).__name__}, received: {type(other).__name__}" + ) return (self.uid == other.uid) and (self.start_time_utc == other.start_time_utc) @@ -149,10 +156,10 @@ class Dropmate: # noqa: D101 firmware_version: float last_scanned_time_utc: dt.datetime - def __len__(self) -> int: + def __len__(self) -> int: # pragma: no cover return len(self.drops) - def __str__(self) -> str: + def __str__(self) -> str: # pragma: no cover scanned_pretty = self.last_scanned_time_utc.strftime(r"%Y-%m-%d %H:%M") return f"UID: {self.uid}, FW: {self.firmware_version}, {len(self.drops)} drops, Scanned: {scanned_pretty} UTC" # noqa: E501 @@ -175,13 +182,24 @@ def _group_by_uid(drop_logs: list[DropRecord]) -> list[Dropmate]: return dropmates -def parse_raw_log(log_filepath: Path) -> list[Dropmate]: - """Parse the provided compiled Dropmate log CSV into a list of drops, grouped by device.""" - full_log = log_filepath.read_text().splitlines() - indices = ColumnIndices.from_header(full_log[0]) +def _parse_raw_log(log_lines: abc.Sequence[str]) -> list[DropRecord]: + """ + Parse the provided compiled Dropmate log lines into a list of drop records. + + NOTE: The provided `log_lines` is assumed to include the header line. + """ + indices = ColumnIndices.from_header(log_lines[0]) drop_logs = [] - for line in full_log[1:]: + for line in log_lines[1:]: drop_logs.append(DropRecord.from_raw(line, indices)) - return _group_by_uid(drop_logs) + return drop_logs + + +def log_parse_pipeline(log_filepath: Path) -> list[Dropmate]: + """Parse the provided compiled Dropmate log CSV into a list of drops, grouped by device.""" + log_lines = log_filepath.read_text().splitlines() + parsed_records = _parse_raw_log(log_lines) + + return _group_by_uid(parsed_records) diff --git a/tests/test_faux_series.py b/tests/test_faux_series.py new file mode 100644 index 0000000..d9a28ee --- /dev/null +++ b/tests/test_faux_series.py @@ -0,0 +1,28 @@ +import pytest + +from dropmate_py import parser + +SAMPLE_FULL_HEADER = "serial_number,uid,battery,device_health,firmware_version,log_timestamp,log_altitude,total_flights,flights_over_18kft,recorded_flights,flight_index,start_time_utc,end_time_utc,start_barometric_altitude_msl_ft,end_barometric_altitude_msl_ft,dropmate_internal_time_utc,last_scanned_time_utc,scan_device_type,scan_device_os,dropmate_app_version" +SAMPLE_DATA_LINE = "0,E002270067A94C18,Good,good,5.1,true,true,3,0,3,1,2023-04-20T14:45:45Z,2023-04-20T14:47:37Z,1591,262,2023-04-20T19:16:38Z,2023-04-20T19:34:04.547Z,SM S901U1,31,1.5.16" + +SAMPLE_FULL_HEADER_COL_IDX = parser.ColumnIndices.from_header(SAMPLE_FULL_HEADER) + + +def test_faux_series_getter() -> None: + ds = parser.FauxSeries(SAMPLE_DATA_LINE.split(","), SAMPLE_FULL_HEADER_COL_IDX) + assert ds["uid"] == "E002270067A94C18" + + +SAMPLE_SHORT_HEADER = "serial_number,uid,battery,log_timestamp,log_altitude,total_flights,prior_flights,flights_over_18kft,recorded_flights,flight_index,start_time_utc,end_time_utc,start_barometric_altitude_msl_ft,end_barometric_altitude_msl_ft" +SAMPLE_SHORT_DATA_LINE = ( + "5,E00227006796B05F,Good,on,on,7,0,0,7,1,2023-Apr-20T14-48-53Z,2023-Apr-20T14-56-07Z,5364,1444" +) + +SAMPLE_SHORT_HEADER_COL_IDX = parser.ColumnIndices.from_header(SAMPLE_SHORT_HEADER) + + +def test_faux_series_getter_old_log_raises() -> None: + ds = parser.FauxSeries(SAMPLE_SHORT_DATA_LINE.split(","), SAMPLE_SHORT_HEADER_COL_IDX) + + with pytest.raises(KeyError): + _ = ds["device_health"] diff --git a/tests/test_log_objects.py b/tests/test_log_objects.py new file mode 100644 index 0000000..7405cc6 --- /dev/null +++ b/tests/test_log_objects.py @@ -0,0 +1,154 @@ +import datetime as dt +from dataclasses import fields +from functools import partial +from pathlib import Path +from textwrap import dedent + +import pytest + +from dropmate_py import parser + +DROP_RECORD_P = partial( + parser.DropRecord, + serial_number="cereal", + battery=parser.Health.GOOD, + device_health=parser.Health.GOOD, + firmware_version=5.1, + end_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=30, second=0, tzinfo=dt.timezone.utc + ), + start_barometric_altitude_msl_ft=1000, + end_barometric_altitude_msl_ft=0, + dropmate_internal_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=12, minute=30, second=0, tzinfo=dt.timezone.utc + ), + last_scanned_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=12, minute=30, second=0, tzinfo=dt.timezone.utc + ), +) + +SAMPLE_FULL_HEADER = "serial_number,uid,battery,device_health,firmware_version,log_timestamp,log_altitude,total_flights,flights_over_18kft,recorded_flights,flight_index,start_time_utc,end_time_utc,start_barometric_altitude_msl_ft,end_barometric_altitude_msl_ft,dropmate_internal_time_utc,last_scanned_time_utc,scan_device_type,scan_device_os,dropmate_app_version" +SAMPLE_DATA_LINE = "cereal,ABC123,Good,good,5.1,true,true,3,0,3,1,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16" + +SAMPLE_FULL_HEADER_COL_IDX = parser.ColumnIndices.from_header(SAMPLE_FULL_HEADER) + + +def test_droprecord_from_raw() -> None: + truth_log = DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ) + + log = parser.DropRecord.from_raw(SAMPLE_DATA_LINE, SAMPLE_FULL_HEADER_COL_IDX) + + for i in fields(truth_log): + left = getattr(log, i.name) + right = getattr(truth_log, i.name) + assert left == right, f"Mismatch for field {i.name}" + + +LOG_EQUALITY_TEST_CASES = ( + ( + DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + True, + ), + ( + DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=21, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + False, + ), + ( + DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + DROP_RECORD_P( + uid="ABC1234", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ), + False, + ), +) + + +@pytest.mark.parametrize(("left", "right", "truth_eq"), LOG_EQUALITY_TEST_CASES) +def test_drop_log_equality( + left: parser.DropRecord, right: parser.DropRecord, truth_eq: bool +) -> None: + assert (left == right) == truth_eq + + +def test_drop_log_equality_non_droplog_raises() -> None: + log = DROP_RECORD_P( + uid="ABC123", + start_time_utc=dt.datetime( + year=2023, month=4, day=20, hour=11, minute=00, second=0, tzinfo=dt.timezone.utc + ), + ) + + with pytest.raises(NotImplementedError): + log == "foo" # noqa: B015 + + +SAMPLE_CONSOLIDATED_LOG = dedent( + """\ + serial_number,uid,battery,device_health,firmware_version,log_timestamp,log_altitude,total_flights,flights_over_18kft,recorded_flights,flight_index,start_time_utc,end_time_utc,start_barometric_altitude_msl_ft,end_barometric_altitude_msl_ft,dropmate_internal_time_utc,last_scanned_time_utc,scan_device_type,scan_device_os,dropmate_app_version + cereal,A1,Good,good,5.1,true,true,3,0,3,1,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16 + cereal,A1,Good,good,5.1,true,true,3,0,3,3,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16 + cereal,A2,Good,good,5.1,true,true,3,0,3,1,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16 + cereal,A2,Good,good,5.1,true,true,3,0,3,3,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16 + cereal,A3,Good,good,5.1,true,true,2,0,2,1,2023-04-20T11:00:00Z,2023-04-20T11:30:00Z,1000,0,2023-04-20T12:30:00Z,2023-04-20T12:30:00Z,SM S901U1,31,1.5.16 + """ +) + + +def test_log_line_parse() -> None: + log_lines = SAMPLE_CONSOLIDATED_LOG.splitlines() + parsed_records = parser._parse_raw_log(log_lines) + + assert len(parsed_records) == 5 + + +def test_group_by_uid() -> None: + log_lines = SAMPLE_CONSOLIDATED_LOG.splitlines() + parsed_records = parser._parse_raw_log(log_lines) + + grouped_records = parser._group_by_uid(parsed_records) + assert len(grouped_records) == 3 + assert [rec.uid for rec in grouped_records] == ["A1", "A2", "A3"] + + +def test_file_parse_pipeline(tmp_path: Path) -> None: + log_file = tmp_path / "compiled.CSV" + log_file.write_text(SAMPLE_CONSOLIDATED_LOG) + + grouped_records = parser.log_parse_pipeline(log_file) + assert len(grouped_records) == 3 + assert [rec.uid for rec in grouped_records] == ["A1", "A2", "A3"]