From 21567a439f496ced3c2ced8b8753f8610b220c24 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin Date: Fri, 4 Aug 2023 16:15:43 -0400 Subject: [PATCH 1/5] add data request metadata --- docs/examples/_data_server.py | 143 ++++++++++++++++++++++++++++++++++ src/viam/app/data_client.py | 59 ++++++++++++-- tests/mocks/services.py | 26 +++++-- tests/test_data_client.py | 77 ++++++++++++------ 4 files changed, 267 insertions(+), 38 deletions(-) create mode 100644 docs/examples/_data_server.py diff --git a/docs/examples/_data_server.py b/docs/examples/_data_server.py new file mode 100644 index 000000000..277efcf80 --- /dev/null +++ b/docs/examples/_data_server.py @@ -0,0 +1,143 @@ +import asyncio + +from grpclib.utils import graceful_exit +from grpclib.server import Server, Stream +from google.protobuf.struct_pb2 import Struct + +from viam.proto.app.data import ( + AddBoundingBoxToImageByIDResponse, + AddBoundingBoxToImageByIDRequest, + AddTagsToBinaryDataByFilterRequest, + AddTagsToBinaryDataByFilterResponse, + AddTagsToBinaryDataByIDsRequest, + AddTagsToBinaryDataByIDsResponse, + BinaryDataByFilterRequest, + BinaryDataByFilterResponse, + BinaryDataByIDsRequest, + BinaryDataByIDsResponse, + BoundingBoxLabelsByFilterRequest, + BoundingBoxLabelsByFilterResponse, + DataServiceBase, + DeleteBinaryDataByFilterRequest, + DeleteBinaryDataByFilterResponse, + DeleteBinaryDataByIDsRequest, + DeleteBinaryDataByIDsResponse, + DeleteTabularDataByFilterRequest, + DeleteTabularDataByFilterResponse, + RemoveBoundingBoxFromImageByIDResponse, + RemoveBoundingBoxFromImageByIDRequest, + RemoveTagsFromBinaryDataByFilterRequest, + RemoveTagsFromBinaryDataByFilterResponse, + RemoveTagsFromBinaryDataByIDsRequest, + RemoveTagsFromBinaryDataByIDsResponse, + TabularData, + TabularDataByFilterRequest, + TabularDataByFilterResponse, + TagsByFilterRequest, + TagsByFilterResponse, +) +from viam.proto.app.datasync import ( + DataCaptureUploadRequest, + DataCaptureUploadResponse, + DataSyncServiceBase, + FileUploadRequest, + FileUploadResponse, +) + + +class MockData(DataServiceBase): + def __init__(self): + self.tabular_data_requested = False + self.tabular_response = [{"PowerPct": 0, "IsPowered": False}, {"PowerPct": 0, "IsPowered": False}, {"Position": 0}] + + async def TabularDataByFilter(self, stream: [TabularDataByFilterRequest, TabularDataByFilterResponse]) -> None: + if self.tabular_data_requested: + await stream.send_message(TabularDataByFilterResponse()) + return + self.tabular_data_requested = True + _ = await stream.recv_message() + n = len(self.tabular_response) + tabular_structs = [None] * n + tabular_metadata = [] + for i in range(n): + tabular_data, tabular_md = self.tabular_response[i] + s = Struct() + s.update(tabular_data) + tabular_structs[i] = s + tabular_metadata.append(tabular_md) + await stream.send_message(TabularDataByFilterResponse( + data=[TabularData(data=struct, metadata_index=idx) for idx, struct in enumerate(tabular_structs)], + metadata=tabular_metadata) + ) + + async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None: + pass + + async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDataByIDsResponse]) -> None: + pass + + async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None: + pass + + async def DeleteBinaryDataByFilter(self, stream: Stream[DeleteBinaryDataByFilterRequest, DeleteBinaryDataByFilterResponse]) -> None: + pass + + async def DeleteBinaryDataByIDs(self, stream: Stream[DeleteBinaryDataByIDsRequest, DeleteBinaryDataByIDsResponse]) -> None: + pass + + async def AddTagsToBinaryDataByIDs(self, stream: Stream[AddTagsToBinaryDataByIDsRequest, AddTagsToBinaryDataByIDsResponse]) -> None: + pass + + async def AddTagsToBinaryDataByFilter( + self, + stream: Stream[AddTagsToBinaryDataByFilterRequest, AddTagsToBinaryDataByFilterResponse] + ) -> None: + pass + + async def RemoveTagsFromBinaryDataByIDs( + self, + stream: Stream[RemoveTagsFromBinaryDataByIDsRequest, RemoveTagsFromBinaryDataByIDsResponse] + ) -> None: + pass + + async def RemoveTagsFromBinaryDataByFilter( + self, + stream: Stream[RemoveTagsFromBinaryDataByFilterRequest, RemoveTagsFromBinaryDataByFilterResponse] + ) -> None: + pass + + async def TagsByFilter(self, stream: Stream[TagsByFilterRequest, TagsByFilterResponse]) -> None: + pass + + async def AddBoundingBoxToImageByID( + self, + stream: Stream[AddBoundingBoxToImageByIDRequest, AddBoundingBoxToImageByIDResponse] + ) -> None: + pass + + async def RemoveBoundingBoxFromImageByID( + self, + stream: Stream[RemoveBoundingBoxFromImageByIDRequest, RemoveBoundingBoxFromImageByIDResponse] + ) -> None: + pass + + async def BoundingBoxLabelsByFilter(self, stream: Stream[BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse]) -> None: + pass + + +class MockDataSync(DataSyncServiceBase): + async def DataCaptureUpload(self, stream: Stream[DataCaptureUploadRequest, DataCaptureUploadResponse]) -> None: + await stream.send_message(DataCaptureUploadResponse()) + + async def FileUpload(self, stream: Stream[FileUploadRequest, FileUploadResponse]) -> None: + pass + + +async def main(*, host: str = '127.0.0.1', port: int = 9092) -> None: + data_server = Server([MockData(), MockDataSync()]) + with graceful_exit([data_server]): + await data_server.start(host, port) + await data_server.wait_closed() + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 24b72d2f4..0f7d85fe6 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -14,9 +14,11 @@ BinaryDataByIDsRequest, BinaryDataByIDsResponse, BinaryID, + BinaryMetadata, BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse, CaptureInterval, + CaptureMetadata, DataRequest, DataServiceStub, DeleteBinaryDataByFilterRequest, @@ -60,6 +62,46 @@ class DataClient: `ViamClient`. """ + class TabularData: + """Class representing a piece of tabular data and associated metadata. + + Args: + data (Mapping[str, Any]): the requested data. + metadata (viam.proto.app.data.CaptureMetadata): the metadata from the request. + time_requested (datetime): the time the data request was sent. + time_received (datetime): the time the requested data was received. + """ + + def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requested: datetime, time_received: datetime) -> None: + self._data = data + self._metadata = metadata + self._time_requested = time_requested + self._time_received = time_received + + def __str__(self) -> str: + return f"{self._data}\n{self._metadata}Time requested: {self._time_requested}\nTime received: {self._time_received}\n" + + def __repr__(self) -> str: + return self.__str__() + + class BinaryData: + """Class representing a piece of binary data and associated metadata. + + Args: + data (Mapping[str, Any]): the requested data. + metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request. + """ + + def __init__(self, data: Mapping[str, Any], metadata: BinaryMetadata) -> None: + self._data = data + self._metadata = metadata + + def __str__(self) -> str: + return f"{self._data}\n{self._metadata}" + + def __repr__(self) -> str: + return self.__str__() + def __init__(self, channel: Channel, metadata: Mapping[str, str]): """Create a `DataClient` that maintains a connection to app. @@ -79,7 +121,7 @@ async def tabular_data_by_filter( self, filter: Optional[Filter] = None, dest: Optional[str] = None, - ) -> List[Mapping[str, Any]]: + ) -> List[TabularData]: """Filter and download tabular data. Args: @@ -102,7 +144,12 @@ async def tabular_data_by_filter( response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata) if not response.data or len(response.data) == 0: break - data += [struct_to_dict(struct.data) for struct in response.data] + data += [DataClient.TabularData( + struct_to_dict(struct.data), + response.metadata[struct.metadata_index], + struct.time_requested.ToDatetime(), + struct.time_received.ToDatetime(), + ) for struct in response.data] last = response.last if dest: @@ -117,7 +164,7 @@ async def binary_data_by_filter( self, filter: Optional[Filter] = None, dest: Optional[str] = None, - ) -> List[bytes]: + ) -> List[BinaryData]: """Filter and download binary data. Args: @@ -139,7 +186,7 @@ async def binary_data_by_filter( response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata) if not response.data or len(response.data) == 0: break - data += [data.binary for data in response.data] + data += [DataClient.BinaryData(data.binary, data.metadata) for data in response.data] last = response.last if dest: @@ -155,7 +202,7 @@ async def binary_data_by_ids( self, binary_ids: List[BinaryID], dest: Optional[str] = None, - ) -> List[bytes]: + ) -> List[Tuple[bytes, BinaryMetadata]]: """Filter and download binary data. Args: @@ -176,7 +223,7 @@ async def binary_data_by_ids( file.write(f"{response.data}") except Exception as e: LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e) - return [binary_data.binary for binary_data in response.data] + return [(binary_data.binary, binary_data.metadata) for binary_data in response.data] async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int: """Filter and delete tabular data. diff --git a/tests/mocks/services.py b/tests/mocks/services.py index 9eaf30449..205dd86bb 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union from google.protobuf.struct_pb2 import Struct from grpclib.server import Stream @@ -138,8 +138,10 @@ BinaryDataByFilterResponse, BinaryDataByIDsRequest, BinaryDataByIDsResponse, + BinaryMetadata, BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse, + CaptureMetadata, DataServiceBase, DeleteBinaryDataByFilterRequest, DeleteBinaryDataByFilterResponse, @@ -483,8 +485,8 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option class MockData(DataServiceBase): def __init__( self, - tabular_response: List[Mapping[str, Any]], - binary_response: List[bytes], + tabular_response: List[Tuple[Mapping[str, Any], CaptureMetadata]], + binary_response: List[Tuple[bytes, BinaryMetadata]], delete_remove_response: int, tags_response: List[str], bbox_labels_response: List[str], @@ -506,9 +508,15 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T self.filter = request.data_request.filter n = len(self.tabular_response) tabular_response_structs = [Struct()] * n + tabular_metadata = [] for i in range(n): - tabular_response_structs[i].update(self.tabular_response[i]) - await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_response_structs])) + tabular_data, tabular_md = self.tabular_response[i] + tabular_response_structs[i].update(tabular_data) + tabular_metadata.append(tabular_md) + await stream.send_message(TabularDataByFilterResponse( + data=[TabularData(data=struct, metadata_index=idx) for idx, struct in enumerate(tabular_response_structs)], + metadata=tabular_metadata) + ) self.was_tabular_data_requested = True async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None: @@ -518,14 +526,18 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin await stream.send_message(BinaryDataByFilterResponse()) return self.filter = request.data_request.filter - await stream.send_message(BinaryDataByFilterResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response])) + await stream.send_message(BinaryDataByFilterResponse( + data=[BinaryData(binary=binary_data, metadata=binary_metadata) for (binary_data, binary_metadata) in self.binary_response]) + ) self.was_binary_data_requested = True async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDataByIDsResponse]) -> None: request = await stream.recv_message() assert request is not None self.binary_ids = request.binary_ids - await stream.send_message(BinaryDataByIDsResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response])) + await stream.send_message(BinaryDataByIDsResponse( + data=[BinaryData(binary=binary_data, metadata=binary_metadata) for (binary_data, binary_metadata) in self.binary_response]) + ) async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None: request = await stream.recv_message() diff --git a/tests/test_data_client.py b/tests/test_data_client.py index 47198e391..4af19b996 100644 --- a/tests/test_data_client.py +++ b/tests/test_data_client.py @@ -7,10 +7,14 @@ from viam.app.data_client import DataClient from viam.proto.app.data import ( - Filter, + Annotations, BinaryID, + BinaryMetadata, + BoundingBox, CaptureInterval, - TagsFilter + CaptureMetadata, + Filter, + TagsFilter, ) from .mocks.services import MockData @@ -26,13 +30,17 @@ LOCATION_IDS = [LOCATION_ID] ORG_ID = "organization_id" ORG_IDS = [ORG_ID] -MIME_TYPES = ["mime_type"] +MIME_TYPE = "mime_type" +MIME_TYPES = [MIME_TYPE] +URI = "some.robot.uri" START_DATETIME = datetime(2001, 1, 1, 1, 1, 1) END_DATETIME = datetime(2001, 1, 1, 1, 1, 1) SECONDS_START = 978310861 NANOS_START = 0 SECONDS_END = 978310861 NANOS_END = 0 +START_TS = Timestamp(seconds=SECONDS_START, nanos=NANOS_START) +END_TS = Timestamp(seconds=SECONDS_END, nanos=NANOS_END) TAGS = ["tag"] BBOX_LABELS = ["bbox_label"] FILTER = Filter( @@ -47,14 +55,8 @@ organization_ids=ORG_IDS, mime_type=MIME_TYPES, interval=CaptureInterval( - start=Timestamp( - seconds=SECONDS_START, - nanos=NANOS_START, - ), - end=Timestamp( - seconds=SECONDS_END, - nanos=NANOS_END - ) + start=START_TS, + end=END_TS ), tags_filter=TagsFilter( tags=TAGS @@ -68,25 +70,50 @@ location_id=LOCATION_ID )] BINARY_DATA = b'binary_data' -TIMESTAMPS = [( - Timestamp( - seconds=SECONDS_START, - nanos=NANOS_START - ), - Timestamp( - seconds=SECONDS_END, - nanos=NANOS_END - ) -)] -TABULAR_DATA = [{"key": "value"}] +TIMESTAMPS = [(START_TS, END_TS)] FILE_NAME = "file_name" FILE_EXT = "file_extension" +BBOX_LABEL = "bbox_label" +BBOX_LABELS_RESPONSE = [BBOX_LABEL] +BBOX = BoundingBox( + id="id", + label=BBOX_LABEL, + x_min_normalized=0, + y_min_normalized=1, + x_max_normalized=2, + y_max_normalized=3, +) +BBOXES = [BBOX] +TABULAR_DATA = {"key": "value"} +TABULAR_METADATA = CaptureMetadata( + organization_id=ORG_ID, + location_id=LOCATION_ID, + robot_name=ROBOT_NAME, + robot_id=ROBOT_ID, + part_name=PART_NAME, + part_id=PART_ID, + component_type=COMPONENT_TYPE, + component_name=COMPONENT_NAME, + method_name=METHOD, + method_parameters={}, + tags=TAGS, + mime_type=MIME_TYPE, +) +BINARY_METADATA = BinaryMetadata( + id="id", + capture_metadata=TABULAR_METADATA, + time_requested=START_TS, + time_received=END_TS, + file_name=FILE_NAME, + file_ext=FILE_EXT, + uri=URI, + annotations=Annotations(bboxes=BBOXES), +) -TABULAR_RESPONSE = TABULAR_DATA -BINARY_RESPONSE = [BINARY_DATA] +TABULAR_RESPONSE = [(TABULAR_DATA, TABULAR_METADATA)] +BINARY_RESPONSE = [(BINARY_DATA, BINARY_METADATA)] DELETE_REMOVE_RESPONSE = 1 TAGS_RESPONSE = ["tag"] -BBOX_LABELS_RESPONSE = ["bbox_label"] AUTH_TOKEN = "auth_token" DATA_SERVICE_METADATA = {"authorization": f"Bearer {AUTH_TOKEN}"} From 7b333a69d07650d41afb08c9982ba6c3ef495227 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin Date: Fri, 4 Aug 2023 16:45:03 -0400 Subject: [PATCH 2/5] refactor docs example --- docs/examples/_data_server.py | 143 ---------------------------------- docs/examples/_server.py | 32 +++++++- 2 files changed, 29 insertions(+), 146 deletions(-) delete mode 100644 docs/examples/_data_server.py diff --git a/docs/examples/_data_server.py b/docs/examples/_data_server.py deleted file mode 100644 index 277efcf80..000000000 --- a/docs/examples/_data_server.py +++ /dev/null @@ -1,143 +0,0 @@ -import asyncio - -from grpclib.utils import graceful_exit -from grpclib.server import Server, Stream -from google.protobuf.struct_pb2 import Struct - -from viam.proto.app.data import ( - AddBoundingBoxToImageByIDResponse, - AddBoundingBoxToImageByIDRequest, - AddTagsToBinaryDataByFilterRequest, - AddTagsToBinaryDataByFilterResponse, - AddTagsToBinaryDataByIDsRequest, - AddTagsToBinaryDataByIDsResponse, - BinaryDataByFilterRequest, - BinaryDataByFilterResponse, - BinaryDataByIDsRequest, - BinaryDataByIDsResponse, - BoundingBoxLabelsByFilterRequest, - BoundingBoxLabelsByFilterResponse, - DataServiceBase, - DeleteBinaryDataByFilterRequest, - DeleteBinaryDataByFilterResponse, - DeleteBinaryDataByIDsRequest, - DeleteBinaryDataByIDsResponse, - DeleteTabularDataByFilterRequest, - DeleteTabularDataByFilterResponse, - RemoveBoundingBoxFromImageByIDResponse, - RemoveBoundingBoxFromImageByIDRequest, - RemoveTagsFromBinaryDataByFilterRequest, - RemoveTagsFromBinaryDataByFilterResponse, - RemoveTagsFromBinaryDataByIDsRequest, - RemoveTagsFromBinaryDataByIDsResponse, - TabularData, - TabularDataByFilterRequest, - TabularDataByFilterResponse, - TagsByFilterRequest, - TagsByFilterResponse, -) -from viam.proto.app.datasync import ( - DataCaptureUploadRequest, - DataCaptureUploadResponse, - DataSyncServiceBase, - FileUploadRequest, - FileUploadResponse, -) - - -class MockData(DataServiceBase): - def __init__(self): - self.tabular_data_requested = False - self.tabular_response = [{"PowerPct": 0, "IsPowered": False}, {"PowerPct": 0, "IsPowered": False}, {"Position": 0}] - - async def TabularDataByFilter(self, stream: [TabularDataByFilterRequest, TabularDataByFilterResponse]) -> None: - if self.tabular_data_requested: - await stream.send_message(TabularDataByFilterResponse()) - return - self.tabular_data_requested = True - _ = await stream.recv_message() - n = len(self.tabular_response) - tabular_structs = [None] * n - tabular_metadata = [] - for i in range(n): - tabular_data, tabular_md = self.tabular_response[i] - s = Struct() - s.update(tabular_data) - tabular_structs[i] = s - tabular_metadata.append(tabular_md) - await stream.send_message(TabularDataByFilterResponse( - data=[TabularData(data=struct, metadata_index=idx) for idx, struct in enumerate(tabular_structs)], - metadata=tabular_metadata) - ) - - async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None: - pass - - async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDataByIDsResponse]) -> None: - pass - - async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None: - pass - - async def DeleteBinaryDataByFilter(self, stream: Stream[DeleteBinaryDataByFilterRequest, DeleteBinaryDataByFilterResponse]) -> None: - pass - - async def DeleteBinaryDataByIDs(self, stream: Stream[DeleteBinaryDataByIDsRequest, DeleteBinaryDataByIDsResponse]) -> None: - pass - - async def AddTagsToBinaryDataByIDs(self, stream: Stream[AddTagsToBinaryDataByIDsRequest, AddTagsToBinaryDataByIDsResponse]) -> None: - pass - - async def AddTagsToBinaryDataByFilter( - self, - stream: Stream[AddTagsToBinaryDataByFilterRequest, AddTagsToBinaryDataByFilterResponse] - ) -> None: - pass - - async def RemoveTagsFromBinaryDataByIDs( - self, - stream: Stream[RemoveTagsFromBinaryDataByIDsRequest, RemoveTagsFromBinaryDataByIDsResponse] - ) -> None: - pass - - async def RemoveTagsFromBinaryDataByFilter( - self, - stream: Stream[RemoveTagsFromBinaryDataByFilterRequest, RemoveTagsFromBinaryDataByFilterResponse] - ) -> None: - pass - - async def TagsByFilter(self, stream: Stream[TagsByFilterRequest, TagsByFilterResponse]) -> None: - pass - - async def AddBoundingBoxToImageByID( - self, - stream: Stream[AddBoundingBoxToImageByIDRequest, AddBoundingBoxToImageByIDResponse] - ) -> None: - pass - - async def RemoveBoundingBoxFromImageByID( - self, - stream: Stream[RemoveBoundingBoxFromImageByIDRequest, RemoveBoundingBoxFromImageByIDResponse] - ) -> None: - pass - - async def BoundingBoxLabelsByFilter(self, stream: Stream[BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse]) -> None: - pass - - -class MockDataSync(DataSyncServiceBase): - async def DataCaptureUpload(self, stream: Stream[DataCaptureUploadRequest, DataCaptureUploadResponse]) -> None: - await stream.send_message(DataCaptureUploadResponse()) - - async def FileUpload(self, stream: Stream[FileUploadRequest, FileUploadResponse]) -> None: - pass - - -async def main(*, host: str = '127.0.0.1', port: int = 9092) -> None: - data_server = Server([MockData(), MockDataSync()]) - with graceful_exit([data_server]): - await data_server.start(host, port) - await data_server.wait_closed() - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index 12b370445..a4d85b5df 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -1,10 +1,12 @@ import asyncio +from datetime import datetime from grpclib.utils import graceful_exit from grpclib.server import Server, Stream from google.protobuf.struct_pb2 import Struct, Value from google.protobuf.timestamp_pb2 import Timestamp +from viam.app.data_client import DataClient from viam.utils import dict_to_struct, value_to_primitive from viam.proto.app.data import ( AddBoundingBoxToImageByIDResponse, @@ -19,6 +21,7 @@ BinaryDataByIDsResponse, BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse, + CaptureMetadata, DataServiceBase, DeleteBinaryDataByFilterRequest, DeleteBinaryDataByFilterResponse, @@ -169,7 +172,26 @@ class MockData(DataServiceBase): def __init__(self): self.tabular_data_requested = False - self.tabular_response = [{"PowerPct": 0, "IsPowered": False}, {"PowerPct": 0, "IsPowered": False}, {"Position": 0}] + self.tabular_response = [ + DataClient.TabularData( + {"PowerPct": 0, "IsPowered": False}, + CaptureMetadata(method_name="IsPowered"), + datetime(2022, 1, 1, 1, 1, 1), + datetime(2022, 12, 31, 23, 59, 59), + ), + DataClient.TabularData( + {"PowerPct": 0, "IsPowered": False}, + CaptureMetadata(location_id="loc-id"), + datetime(2023, 1, 2), + datetime(2023, 3, 4) + ), + DataClient.TabularData( + {"Position": 0}, + CaptureMetadata(), + datetime(2023, 5, 6), + datetime(2023, 7, 8), + ), + ] async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, TabularDataByFilterResponse]) -> None: if self.tabular_data_requested: @@ -180,8 +202,12 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T n = len(self.tabular_response) tabular_structs = [Struct()] * n for i in range(n): - tabular_structs[i].update(self.tabular_response[i]) - await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_structs])) + tabular_structs[i].update(self.tabular_response[i]._data) + await stream.send_message(TabularDataByFilterResponse( + data=[TabularData(data=struct) for struct in tabular_structs], + metadata=[d._metadata for d in self.tabular_response], + ) + ) async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None: pass From a0ed6dd6b075102911d0b0aa2d74b9fec62b1b02 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin Date: Mon, 7 Aug 2023 11:16:22 -0400 Subject: [PATCH 3/5] fix tests, cleanup --- src/viam/app/data_client.py | 20 +++++++++--------- tests/mocks/services.py | 32 ++++++++++++++++------------- tests/test_data_client.py | 41 ++++++++++--------------------------- 3 files changed, 39 insertions(+), 54 deletions(-) diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 0f7d85fe6..3aa30622a 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -81,26 +81,26 @@ def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requ def __str__(self) -> str: return f"{self._data}\n{self._metadata}Time requested: {self._time_requested}\nTime received: {self._time_received}\n" - def __repr__(self) -> str: - return self.__str__() + def __eq__(self, other: "TabularData") -> bool: + return str(self) == str(other) class BinaryData: """Class representing a piece of binary data and associated metadata. Args: - data (Mapping[str, Any]): the requested data. + data (bytes): the requested data. metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request. """ - def __init__(self, data: Mapping[str, Any], metadata: BinaryMetadata) -> None: + def __init__(self, data: bytes, metadata: BinaryMetadata) -> None: self._data = data self._metadata = metadata def __str__(self) -> str: return f"{self._data}\n{self._metadata}" - def __repr__(self) -> str: - return self.__str__() + def __eq__(self, other: "BinaryData") -> bool: + return str(self) == str(other) def __init__(self, channel: Channel, metadata: Mapping[str, str]): """Create a `DataClient` that maintains a connection to app. @@ -155,7 +155,7 @@ async def tabular_data_by_filter( if dest: try: file = open(dest, "w") - file.write(f"{data}") + file.write(f"{[str(d) for d in data]}") except Exception as e: LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e) return data @@ -192,7 +192,7 @@ async def binary_data_by_filter( if dest: try: file = open(dest, "w") - file.write(f"{data}") + file.write(f"{[str(d) for d in data]}") except Exception as e: LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e) @@ -202,7 +202,7 @@ async def binary_data_by_ids( self, binary_ids: List[BinaryID], dest: Optional[str] = None, - ) -> List[Tuple[bytes, BinaryMetadata]]: + ) -> List[BinaryData]: """Filter and download binary data. Args: @@ -223,7 +223,7 @@ async def binary_data_by_ids( file.write(f"{response.data}") except Exception as e: LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e) - return [(binary_data.binary, binary_data.metadata) for binary_data in response.data] + return [DataClient.BinaryData(data.binary, data.metadata) for data in response.data] async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int: """Filter and delete tabular data. diff --git a/tests/mocks/services.py b/tests/mocks/services.py index 205dd86bb..4d8c8b635 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -193,11 +193,12 @@ SensorsServiceBase, ) from viam.proto.service.vision import Classification, Detection +from viam.app.data_client import DataClient from viam.services.mlmodel import File, LabelType, Metadata, MLModel, TensorInfo from viam.services.navigation import Navigation from viam.services.slam import SLAM from viam.services.vision import Vision -from viam.utils import ValueTypes, struct_to_dict +from viam.utils import ValueTypes, datetime_to_timestamp, dict_to_struct, struct_to_dict class MockVision(Vision): @@ -485,8 +486,8 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option class MockData(DataServiceBase): def __init__( self, - tabular_response: List[Tuple[Mapping[str, Any], CaptureMetadata]], - binary_response: List[Tuple[bytes, BinaryMetadata]], + tabular_response: List[DataClient.TabularData], + binary_response: List[DataClient.BinaryData], delete_remove_response: int, tags_response: List[str], bbox_labels_response: List[str], @@ -506,16 +507,19 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T await stream.send_message(TabularDataByFilterResponse(data=None)) return self.filter = request.data_request.filter - n = len(self.tabular_response) - tabular_response_structs = [Struct()] * n - tabular_metadata = [] - for i in range(n): - tabular_data, tabular_md = self.tabular_response[i] - tabular_response_structs[i].update(tabular_data) - tabular_metadata.append(tabular_md) + tabular_response_structs = [] + tabular_metadata = [data._metadata for data in self.tabular_response] + for idx, tabular_data in enumerate(self.tabular_response): + tabular_response_structs.append( + TabularData( + data=dict_to_struct(tabular_data._data), + metadata_index=idx, + time_requested=datetime_to_timestamp(tabular_data._time_requested), + time_received=datetime_to_timestamp(tabular_data._time_received) + ) + ) await stream.send_message(TabularDataByFilterResponse( - data=[TabularData(data=struct, metadata_index=idx) for idx, struct in enumerate(tabular_response_structs)], - metadata=tabular_metadata) + data=tabular_response_structs, metadata=tabular_metadata) ) self.was_tabular_data_requested = True @@ -527,7 +531,7 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin return self.filter = request.data_request.filter await stream.send_message(BinaryDataByFilterResponse( - data=[BinaryData(binary=binary_data, metadata=binary_metadata) for (binary_data, binary_metadata) in self.binary_response]) + data=[BinaryData(binary=data._data, metadata=data._metadata) for data in self.binary_response]) ) self.was_binary_data_requested = True @@ -536,7 +540,7 @@ async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDat assert request is not None self.binary_ids = request.binary_ids await stream.send_message(BinaryDataByIDsResponse( - data=[BinaryData(binary=binary_data, metadata=binary_metadata) for (binary_data, binary_metadata) in self.binary_response]) + data=[BinaryData(binary=data._data, metadata=data._metadata) for data in self.binary_response]) ) async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None: diff --git a/tests/test_data_client.py b/tests/test_data_client.py index 4af19b996..49dfd6305 100644 --- a/tests/test_data_client.py +++ b/tests/test_data_client.py @@ -33,17 +33,17 @@ MIME_TYPE = "mime_type" MIME_TYPES = [MIME_TYPE] URI = "some.robot.uri" -START_DATETIME = datetime(2001, 1, 1, 1, 1, 1) -END_DATETIME = datetime(2001, 1, 1, 1, 1, 1) SECONDS_START = 978310861 NANOS_START = 0 SECONDS_END = 978310861 NANOS_END = 0 START_TS = Timestamp(seconds=SECONDS_START, nanos=NANOS_START) END_TS = Timestamp(seconds=SECONDS_END, nanos=NANOS_END) +START_DATETIME = START_TS.ToDatetime() +END_DATETIME = END_TS.ToDatetime() TAGS = ["tag"] BBOX_LABELS = ["bbox_label"] -FILTER = Filter( +FILTER = DataClient.create_filter( component_name=COMPONENT_NAME, component_type=COMPONENT_TYPE, method=METHOD, @@ -54,15 +54,12 @@ location_ids=LOCATION_IDS, organization_ids=ORG_IDS, mime_type=MIME_TYPES, - interval=CaptureInterval( - start=START_TS, - end=END_TS - ), - tags_filter=TagsFilter( - tags=TAGS - ), - bbox_labels=BBOX_LABELS, + start_time=START_DATETIME, + end_time=END_DATETIME, + tags=TAGS, + bbox_labels=BBOX_LABELS ) + FILE_ID = "file_id" BINARY_IDS = [BinaryID( file_id=FILE_ID, @@ -70,7 +67,6 @@ location_id=LOCATION_ID )] BINARY_DATA = b'binary_data' -TIMESTAMPS = [(START_TS, END_TS)] FILE_NAME = "file_name" FILE_EXT = "file_extension" BBOX_LABEL = "bbox_label" @@ -110,8 +106,8 @@ annotations=Annotations(bboxes=BBOXES), ) -TABULAR_RESPONSE = [(TABULAR_DATA, TABULAR_METADATA)] -BINARY_RESPONSE = [(BINARY_DATA, BINARY_METADATA)] +TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)] +BINARY_RESPONSE = [DataClient.BinaryData(BINARY_DATA, BINARY_METADATA)] DELETE_REMOVE_RESPONSE = 1 TAGS_RESPONSE = ["tag"] @@ -135,22 +131,7 @@ class TestClient: async def test_tabular_data_by_filter(self, service: MockData): async with ChannelFor([service]) as channel: client = DataClient(channel, DATA_SERVICE_METADATA) - tabular_data = await client.tabular_data_by_filter(filter=client.create_filter( - component_name=COMPONENT_NAME, - component_type=COMPONENT_TYPE, - method=METHOD, - robot_name=ROBOT_NAME, - robot_id=ROBOT_ID, - part_name=PART_NAME, - part_id=PART_ID, - location_ids=LOCATION_IDS, - organization_ids=ORG_IDS, - mime_type=MIME_TYPES, - start_time=START_DATETIME, - end_time=START_DATETIME, - tags=TAGS, - bbox_labels=BBOX_LABELS - )) + tabular_data = await client.tabular_data_by_filter(filter=FILTER) assert tabular_data == TABULAR_RESPONSE self.assert_filter(filter=service.filter) From e483fc097925c3d67088698c7e4d2ba005a2a054 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin Date: Mon, 7 Aug 2023 12:24:19 -0400 Subject: [PATCH 4/5] make data params public --- docs/examples/_server.py | 19 +++++++++++++------ src/viam/app/data_client.py | 24 ++++++++++++++++-------- tests/mocks/services.py | 12 ++++++------ 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index a4d85b5df..7d2e1c3a7 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -7,7 +7,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from viam.app.data_client import DataClient -from viam.utils import dict_to_struct, value_to_primitive +from viam.utils import datetime_to_timestamp, dict_to_struct, value_to_primitive from viam.proto.app.data import ( AddBoundingBoxToImageByIDResponse, AddBoundingBoxToImageByIDRequest, @@ -199,13 +199,20 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T return self.tabular_data_requested = True _ = await stream.recv_message() - n = len(self.tabular_response) - tabular_structs = [Struct()] * n - for i in range(n): - tabular_structs[i].update(self.tabular_response[i]._data) + tabular_structs = [] + tabular_metadata = [data.metadata for data in self.tabular_response] + for idx, tabular_data in enumerate(self.tabular_response): + tabular_structs.append( + TabularData( + data=dict_to_struct(tabular_data.data), + metadata_index=idx, + time_requested=datetime_to_timestamp(tabular_data.time_requested), + time_received=datetime_to_timestamp(tabular_data.time_received) + ) + ) await stream.send_message(TabularDataByFilterResponse( data=[TabularData(data=struct) for struct in tabular_structs], - metadata=[d._metadata for d in self.tabular_response], + metadata=tabular_metadata, ) ) diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 3aa30622a..8cd20faa2 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -73,13 +73,18 @@ class TabularData: """ def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requested: datetime, time_received: datetime) -> None: - self._data = data - self._metadata = metadata - self._time_requested = time_requested - self._time_received = time_received + self.data = data + self.metadata = metadata + self.time_requested = time_requested + self.time_received = time_received + + data: Mapping[str, Any] + metadata: CaptureMetadata + time_requested: datetime + time_received: datetime def __str__(self) -> str: - return f"{self._data}\n{self._metadata}Time requested: {self._time_requested}\nTime received: {self._time_received}\n" + return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n" def __eq__(self, other: "TabularData") -> bool: return str(self) == str(other) @@ -93,11 +98,14 @@ class BinaryData: """ def __init__(self, data: bytes, metadata: BinaryMetadata) -> None: - self._data = data - self._metadata = metadata + self.data = data + self.metadata = metadata + + data: bytes + metadata: BinaryMetadata def __str__(self) -> str: - return f"{self._data}\n{self._metadata}" + return f"{self.data}\n{self.metadata}" def __eq__(self, other: "BinaryData") -> bool: return str(self) == str(other) diff --git a/tests/mocks/services.py b/tests/mocks/services.py index 4d8c8b635..e6a8fbeff 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -508,14 +508,14 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T return self.filter = request.data_request.filter tabular_response_structs = [] - tabular_metadata = [data._metadata for data in self.tabular_response] + tabular_metadata = [data.metadata for data in self.tabular_response] for idx, tabular_data in enumerate(self.tabular_response): tabular_response_structs.append( TabularData( - data=dict_to_struct(tabular_data._data), + data=dict_to_struct(tabular_data.data), metadata_index=idx, - time_requested=datetime_to_timestamp(tabular_data._time_requested), - time_received=datetime_to_timestamp(tabular_data._time_received) + time_requested=datetime_to_timestamp(tabular_data.time_requested), + time_received=datetime_to_timestamp(tabular_data.time_received) ) ) await stream.send_message(TabularDataByFilterResponse( @@ -531,7 +531,7 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin return self.filter = request.data_request.filter await stream.send_message(BinaryDataByFilterResponse( - data=[BinaryData(binary=data._data, metadata=data._metadata) for data in self.binary_response]) + data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response]) ) self.was_binary_data_requested = True @@ -540,7 +540,7 @@ async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDat assert request is not None self.binary_ids = request.binary_ids await stream.send_message(BinaryDataByIDsResponse( - data=[BinaryData(binary=data._data, metadata=data._metadata) for data in self.binary_response]) + data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response]) ) async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None: From 3d07d2950feee653bc929c1839f8ae5a5a5e8bdc Mon Sep 17 00:00:00 2001 From: Ethan Rodkin Date: Mon, 7 Aug 2023 13:07:05 -0400 Subject: [PATCH 5/5] make linter happy --- docs/examples/_server.py | 3 +-- src/viam/app/data_client.py | 4 ++-- tests/mocks/services.py | 5 +---- tests/test_data_client.py | 3 --- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index 7d2e1c3a7..83ea23b62 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -211,8 +211,7 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T ) ) await stream.send_message(TabularDataByFilterResponse( - data=[TabularData(data=struct) for struct in tabular_structs], - metadata=tabular_metadata, + data=tabular_structs, metadata=tabular_metadata, ) ) diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 8cd20faa2..8d796e08f 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -86,7 +86,7 @@ def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requ def __str__(self) -> str: return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n" - def __eq__(self, other: "TabularData") -> bool: + def __eq__(self, other: "DataClient.TabularData") -> bool: return str(self) == str(other) class BinaryData: @@ -107,7 +107,7 @@ def __init__(self, data: bytes, metadata: BinaryMetadata) -> None: def __str__(self) -> str: return f"{self.data}\n{self.metadata}" - def __eq__(self, other: "BinaryData") -> bool: + def __eq__(self, other: "DataClient.BinaryData") -> bool: return str(self) == str(other) def __init__(self, channel: Channel, metadata: Mapping[str, str]): diff --git a/tests/mocks/services.py b/tests/mocks/services.py index e6a8fbeff..8f273b4b5 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -1,6 +1,5 @@ -from typing import Any, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Dict, List, Mapping, Optional, Union -from google.protobuf.struct_pb2 import Struct from grpclib.server import Stream from PIL import Image @@ -138,10 +137,8 @@ BinaryDataByFilterResponse, BinaryDataByIDsRequest, BinaryDataByIDsResponse, - BinaryMetadata, BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse, - CaptureMetadata, DataServiceBase, DeleteBinaryDataByFilterRequest, DeleteBinaryDataByFilterResponse, diff --git a/tests/test_data_client.py b/tests/test_data_client.py index 49dfd6305..d1e1c991c 100644 --- a/tests/test_data_client.py +++ b/tests/test_data_client.py @@ -1,5 +1,4 @@ import pytest -from datetime import datetime from typing import List from grpclib.testing import ChannelFor @@ -11,10 +10,8 @@ BinaryID, BinaryMetadata, BoundingBox, - CaptureInterval, CaptureMetadata, Filter, - TagsFilter, ) from .mocks.services import MockData