diff --git a/poetry.lock b/poetry.lock index e0e008a..5a0ad39 100644 --- a/poetry.lock +++ b/poetry.lock @@ -865,13 +865,18 @@ setuptools = "*" [[package]] name = "hightime" -version = "0.2.2" +version = "0.3.0-dev0" description = "Hightime Python API" optional = false -python-versions = "*" -files = [ - {file = "hightime-0.2.2-py3-none-any.whl", hash = "sha256:5109a449bb3a75dbf305147777de71634c91b943d47cfbee18ed2f34a8307e0b"}, -] +python-versions = "^3.9" +files = [] +develop = false + +[package.source] +type = "git" +url = "https://github.com/ni/hightime.git" +reference = "HEAD" +resolved_reference = "56506783601505cb075fe9aa670eee92cac01f54" [[package]] name = "idna" @@ -2872,4 +2877,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0,!=3.9.7" -content-hash = "4d6ea1ed746a31a8780b057475409256690f3db3e5c27d8345d19b11703aa03c" +content-hash = "ad2491b853963ec0501a51fc5c2754991650928ee489ccab6c0b2ba836963971" diff --git a/pyproject.toml b/pyproject.toml index 573dc8e..9cc9902 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ protobuf = {version=">=4.21"} ni-measurement-plugin-sdk = {version=">=2.3"} typing-extensions = ">=4.13.2" streamlit = ">=1.24" -nitypes = {version=">=0.1.0dev1", allow-prereleases=true} +nitypes = {version=">=0.1.0dev2", allow-prereleases=true} debugpy = ">=1.8.1" [tool.poetry.group.dev.dependencies] @@ -30,6 +30,8 @@ pyright = { version = ">=1.1.400", extras = ["nodejs"] } pytest = ">=7.2" pytest-cov = ">=4.0" pytest-mock = ">=3.0" +# Use an unreleased version of hightime for testing. +hightime = { git = "https://github.com/ni/hightime.git" } [tool.poetry.group.codegen.dependencies] grpcio-tools = [ diff --git a/src/nipanel/converters/protobuf_types.py b/src/nipanel/converters/protobuf_types.py index 0b5aa70..ebdae61 100644 --- a/src/nipanel/converters/protobuf_types.py +++ b/src/nipanel/converters/protobuf_types.py @@ -1,9 +1,29 @@ """Classes to convert between measurement specific protobuf types and containers.""" +import collections.abc +import datetime as dt from typing import Type, Union +import hightime as ht +import nitypes.bintime as bt +import numpy as np from ni.protobuf.types import scalar_pb2 +from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import ( + PrecisionTimestamp, +) +from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import ( + DoubleAnalogWaveform, + WaveformAttributeValue, +) from nitypes.scalar import Scalar +from nitypes.time import convert_datetime +from nitypes.waveform import ( + AnalogWaveform, + ExtendedPropertyDictionary, + ExtendedPropertyValue, + NoneScaleMode, + Timing, +) from typing_extensions import TypeAlias from nipanel.converters import Converter @@ -17,6 +37,133 @@ } +class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[np.float64], DoubleAnalogWaveform]): + """A converter for AnalogWaveform types with scaled data (double).""" + + def __init__(self) -> None: + """Initialize a DoubleAnalogWaveformConverter object.""" + self._pt_converter = PrecisionTimestampConverter() + + @property + def python_typename(self) -> str: + """The Python type that this converter handles.""" + return AnalogWaveform.__name__ + + @property + def protobuf_message(self) -> Type[DoubleAnalogWaveform]: + """The type-specific protobuf message for the Python type.""" + return DoubleAnalogWaveform + + def to_protobuf_message(self, python_value: AnalogWaveform[np.float64]) -> DoubleAnalogWaveform: + """Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform.""" + if python_value.timing.has_timestamp: + bin_datetime = convert_datetime(bt.DateTime, python_value.timing.start_time) + precision_timestamp = self._pt_converter.to_protobuf_message(bin_datetime) + else: + precision_timestamp = None + + if python_value.timing.has_sample_interval: + time_interval = python_value.timing.sample_interval.total_seconds() + else: + time_interval = 0 + + attributes = self._extended_properties_to_attributes(python_value.extended_properties) + + return self.protobuf_message( + t0=precision_timestamp, + dt=time_interval, + y_data=python_value.scaled_data, + attributes=attributes, + ) + + def _extended_properties_to_attributes( + self, + extended_properties: ExtendedPropertyDictionary, + ) -> collections.abc.Mapping[str, WaveformAttributeValue]: + return {key: self._value_to_attribute(value) for key, value in extended_properties.items()} + + def _value_to_attribute(self, value: ExtendedPropertyValue) -> WaveformAttributeValue: + attr_value = WaveformAttributeValue() + if isinstance(value, bool): + attr_value.bool_value = value + elif isinstance(value, int): + attr_value.integer_value = value + elif isinstance(value, float): + attr_value.double_value = value + elif isinstance(value, str): + attr_value.string_value = value + else: + raise TypeError(f"Unexpected type for extended property value {type(value)}") + + return attr_value + + def to_python_value(self, protobuf_message: DoubleAnalogWaveform) -> AnalogWaveform[np.float64]: + """Convert the protobuf DoubleAnalogWaveform to a Python AnalogWaveform.""" + # Declare timing to accept both bintime and dt.datetime to satisfy mypy. + timing: Timing[bt.DateTime | dt.datetime] + if not protobuf_message.dt and not protobuf_message.HasField("t0"): + # If both dt and t0 are unset, use Timing.empty. + timing = Timing.empty + else: + # Timestamp + pt_converter = PrecisionTimestampConverter() + bin_datetime = pt_converter.to_python_value(protobuf_message.t0) + + # Sample Interval + if not protobuf_message.dt: + timing = Timing.create_with_no_interval(timestamp=bin_datetime) + else: + sample_interval = ht.timedelta(seconds=protobuf_message.dt) + timing = Timing.create_with_regular_interval( + sample_interval=sample_interval, + timestamp=bin_datetime, + ) + + extended_properties = {} + for key, value in protobuf_message.attributes.items(): + attr_type = value.WhichOneof("attribute") + extended_properties[key] = getattr(value, str(attr_type)) + + data_array = np.array(protobuf_message.y_data) + return AnalogWaveform( + sample_count=data_array.size, + dtype=np.float64, + raw_data=data_array, + start_index=0, + capacity=data_array.size, + extended_properties=extended_properties, + copy_extended_properties=True, + timing=timing, + scale_mode=NoneScaleMode(), + ) + + +class PrecisionTimestampConverter(Converter[bt.DateTime, PrecisionTimestamp]): + """A converter for bintime.DateTime types.""" + + @property + def python_typename(self) -> str: + """The Python type that this converter handles.""" + return bt.DateTime.__name__ + + @property + def protobuf_message(self) -> Type[PrecisionTimestamp]: + """The type-specific protobuf message for the Python type.""" + return PrecisionTimestamp + + def to_protobuf_message(self, python_value: bt.DateTime) -> PrecisionTimestamp: + """Convert the Python DateTime to a protobuf PrecisionTimestamp.""" + seconds, fractional_seconds = python_value.to_tuple() + return self.protobuf_message(seconds=seconds, fractional_seconds=fractional_seconds) + + def to_python_value(self, protobuf_message: PrecisionTimestamp) -> bt.DateTime: + """Convert the protobuf PrecisionTimestamp to a Python DateTime.""" + time_value_tuple = bt.TimeValueTuple( + protobuf_message.seconds, protobuf_message.fractional_seconds + ) + return bt.DateTime.from_tuple(time_value_tuple) + + class ScalarConverter(Converter[Scalar[_AnyScalarType], scalar_pb2.ScalarData]): """A converter for Scalar objects.""" diff --git a/tests/unit/test_protobuf_type_conversion.py b/tests/unit/test_protobuf_type_conversion.py index 5dc841f..b0de750 100644 --- a/tests/unit/test_protobuf_type_conversion.py +++ b/tests/unit/test_protobuf_type_conversion.py @@ -1,12 +1,165 @@ +import datetime as dt + +import numpy import pytest from ni.protobuf.types.scalar_pb2 import ScalarData +from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import ( + DoubleAnalogWaveform, + WaveformAttributeValue, +) +from nitypes.bintime import DateTime from nitypes.scalar import Scalar +from nitypes.waveform import AnalogWaveform, NoneScaleMode, SampleIntervalMode, Timing + +from nipanel.converters.protobuf_types import ( + DoubleAnalogWaveformConverter, + PrecisionTimestampConverter, + ScalarConverter, +) + + +# ======================================================== +# AnalogWaveform to DoubleAnalogWaveform +# ======================================================== +def test___default_analog_waveform___convert___valid_protobuf() -> None: + analog_waveform = AnalogWaveform() + + converter = DoubleAnalogWaveformConverter() + dbl_analog_waveform = converter.to_protobuf_message(analog_waveform) + + assert not dbl_analog_waveform.attributes + assert dbl_analog_waveform.dt == 0 + assert not dbl_analog_waveform.HasField("t0") + assert list(dbl_analog_waveform.y_data) == [] + + +def test___analog_waveform_samples_only___convert___valid_protobuf() -> None: + analog_waveform = AnalogWaveform(5) + + converter = DoubleAnalogWaveformConverter() + dbl_analog_waveform = converter.to_protobuf_message(analog_waveform) + + assert list(dbl_analog_waveform.y_data) == [0.0, 0.0, 0.0, 0.0, 0.0] + + +def test___analog_waveform_non_zero_samples___convert___valid_protobuf() -> None: + analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0])) + + converter = DoubleAnalogWaveformConverter() + dbl_analog_waveform = converter.to_protobuf_message(analog_waveform) + + assert list(dbl_analog_waveform.y_data) == [1.0, 2.0, 3.0] + + +def test___analog_waveform_with_extended_properties___convert___valid_protobuf() -> None: + analog_waveform = AnalogWaveform() + analog_waveform.channel_name = "Dev1/ai0" + analog_waveform.unit_description = "Volts" + + converter = DoubleAnalogWaveformConverter() + dbl_analog_waveform = converter.to_protobuf_message(analog_waveform) + + assert dbl_analog_waveform.attributes["NI_ChannelName"].string_value == "Dev1/ai0" + assert dbl_analog_waveform.attributes["NI_UnitDescription"].string_value == "Volts" + + +def test___analog_waveform_with_standard_timing___convert___valid_protobuf() -> None: + analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0])) + t0_dt = dt.datetime(2000, 12, 1, tzinfo=dt.timezone.utc) + analog_waveform.timing = Timing.create_with_regular_interval( + sample_interval=dt.timedelta(milliseconds=1000), + timestamp=t0_dt, + ) + + converter = DoubleAnalogWaveformConverter() + dbl_analog_waveform = converter.to_protobuf_message(analog_waveform) + + assert dbl_analog_waveform.dt == 1.0 + bin_dt = DateTime(t0_dt) + pt_converter = PrecisionTimestampConverter() + converted_t0 = pt_converter.to_protobuf_message(bin_dt) + assert dbl_analog_waveform.t0 == converted_t0 + + +# ======================================================== +# DoubleAnalogWaveform to AnalogWaveform +# ======================================================== +def test___default_dbl_analog_wfm___convert___valid_python_object() -> None: + dbl_analog_wfm = DoubleAnalogWaveform() + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) + + assert not analog_waveform.extended_properties + assert analog_waveform.timing == Timing.empty + assert analog_waveform.scaled_data.size == 0 + assert analog_waveform.scale_mode == NoneScaleMode() + + +def test___dbl_analog_wfm_with_y_data___convert___valid_python_object() -> None: + dbl_analog_wfm = DoubleAnalogWaveform(y_data=[1.0, 2.0, 3.0]) + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) + + assert list(analog_waveform.scaled_data) == [1.0, 2.0, 3.0] + + +def test___dbl_analog_wfm_with_attributes___convert___valid_python_object() -> None: + attributes = { + "NI_ChannelName": WaveformAttributeValue(string_value="Dev1/ai0"), + "NI_UnitDescription": WaveformAttributeValue(string_value="Volts"), + } + dbl_analog_wfm = DoubleAnalogWaveform(attributes=attributes) + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) + + assert analog_waveform.channel_name == "Dev1/ai0" + assert analog_waveform.unit_description == "Volts" + + +def test___dbl_analog_wfm_with_timing___convert___valid_python_object() -> None: + t0_dt = DateTime(2020, 5, 5, tzinfo=dt.timezone.utc) + pt_converter = PrecisionTimestampConverter() + t0_pt = pt_converter.to_protobuf_message(t0_dt) + dbl_analog_wfm = DoubleAnalogWaveform(t0=t0_pt, dt=0.1, y_data=[1.0, 2.0, 3.0]) + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) + + assert analog_waveform.timing.start_time == t0_dt._to_datetime_datetime() + assert analog_waveform.timing.sample_interval == dt.timedelta(seconds=0.1) + assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___dbl_analog_wfm_with_timing_no_t0___convert___valid_python_object() -> None: + dbl_analog_wfm = DoubleAnalogWaveform(dt=0.1, y_data=[1.0, 2.0, 3.0]) + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) + + assert analog_waveform.timing.start_time == dt.datetime(1904, 1, 1, tzinfo=dt.timezone.utc) + assert analog_waveform.timing.sample_interval == dt.timedelta(seconds=0.1) + assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___dbl_analog_wfm_with_timing_no_dt___convert___valid_python_object() -> None: + t0_dt = DateTime(2020, 5, 5, tzinfo=dt.timezone.utc) + pt_converter = PrecisionTimestampConverter() + t0_pt = pt_converter.to_protobuf_message(t0_dt) + dbl_analog_wfm = DoubleAnalogWaveform(t0=t0_pt, y_data=[1.0, 2.0, 3.0]) + + converter = DoubleAnalogWaveformConverter() + analog_waveform = converter.to_python_value(dbl_analog_wfm) -from nipanel.converters.protobuf_types import ScalarConverter + assert analog_waveform.timing.start_time == t0_dt._to_datetime_datetime() + assert not analog_waveform.timing.has_sample_interval + assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.NONE # ======================================================== -# Protobuf to Python +# Scalar: Protobuf to Python # ======================================================== def test___bool_scalar_protobuf___convert___valid_bool_scalar() -> None: protobuf_value = ScalarData() @@ -84,7 +237,7 @@ def test___scalar_protobuf_units_unset___convert___python_units_blank() -> None: # ======================================================== -# Python to Protobuf +# Scalar: Python to Protobuf # ======================================================== def test___bool_scalar___convert___valid_bool_scalar_protobuf() -> None: python_value = Scalar(True, "volts")