diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 63357c2b8..b615c43c1 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -40,6 +40,7 @@ TagsByFilterResponse, ) from viam.proto.app.datasync import ( + DataCaptureUploadMetadata, DataCaptureUploadRequest, DataCaptureUploadResponse, DataSyncServiceStub, @@ -49,6 +50,8 @@ FileUploadResponse, SensorData, SensorMetadata, + StreamingDataCaptureUploadRequest, + StreamingDataCaptureUploadResponse, UploadMetadata, ) from viam.utils import create_filter, datetime_to_timestamp, struct_to_dict @@ -466,8 +469,8 @@ async def binary_data_capture_upload( sensor_contents = SensorData( metadata=( SensorMetadata( - time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times[0] else None, - time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times[1] else None, + time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None, + time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None, ) if data_request_times else None @@ -537,8 +540,8 @@ async def tabular_data_capture_upload( SensorData( metadata=( SensorMetadata( - time_requested=datetime_to_timestamp(data_request_times[idx][0]) if data_request_times[idx][0] else None, - time_received=datetime_to_timestamp(data_request_times[idx][1]) if data_request_times[idx][1] else None, + time_requested=datetime_to_timestamp(data_request_times[idx][0]) if data_request_times else None, + time_received=datetime_to_timestamp(data_request_times[idx][1]) if data_request_times else None, ) if data_request_times[idx] else None @@ -566,6 +569,63 @@ async def _data_capture_upload(self, metadata: UploadMetadata, sensor_contents: response: DataCaptureUploadResponse = await self._data_sync_client.DataCaptureUpload(request, metadata=self._metadata) return response + async def streaming_data_capture_upload( + self, + data: bytes, + part_id: str, + file_ext: str, + component_type: Optional[str] = None, + component_name: Optional[str] = None, + method_name: Optional[str] = None, + method_parameters: Optional[Mapping[str, Any]] = None, + data_request_times: Optional[Tuple[datetime, datetime]] = None, + tags: Optional[List[str]] = None, + ) -> str: + """Uploads the metadata and contents of streaming binary data. + + Args: + data (bytes): the data to be uploaded. + part_id (str): Part ID of the resource associated with the file. + file_ext (str): file extension type for the data. required for determining MIME type. + component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor"). + component_name (Optional[str]): Optional name of the component associated with the file. + method_name (Optional[str]): Optional name of the method associated with the file. + method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use. + data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s objects + denoting the times this data was requested[0] by the robot and received[1] from the appropriate sensor. + tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data. + + Raises: + GRPCError: If an invalid part ID is passed. + + Returns: + str: the file_id of the uploaded data. + """ + + upload_metadata = UploadMetadata( + part_id=part_id, + component_type=component_type if component_type else "", + component_name=component_name if component_name else "", + method_name=method_name if method_name else "", + method_parameters=method_parameters, + type=DataType.DATA_TYPE_BINARY_SENSOR, + file_extension=file_ext if file_ext[0] == "." else f".{file_ext}", + tags=tags, + ) + sensor_metadata = SensorMetadata( + time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None, + time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None, + ) + metadata = DataCaptureUploadMetadata(upload_metadata=upload_metadata, sensor_metadata=sensor_metadata) + request_metadata = StreamingDataCaptureUploadRequest(metadata=metadata) + async with self._data_sync_client.StreamingDataCaptureUpload.open(metadata=self._metadata) as stream: + await stream.send_message(request_metadata) + await stream.send_message(StreamingDataCaptureUploadRequest(data=data), end=True) + response: StreamingDataCaptureUploadResponse = await stream.recv_message() + if not response: + await stream.recv_trailing_metadata() # causes us to throw appropriate gRPC error + return response.file_id + async def file_upload( self, part_id: str, diff --git a/tests/mocks/services.py b/tests/mocks/services.py index fc55e8b85..b8813e77d 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -800,7 +800,13 @@ async def FileUpload(self, stream: Stream[FileUploadRequest, FileUploadResponse] async def StreamingDataCaptureUpload( self, stream: Stream[StreamingDataCaptureUploadRequest, StreamingDataCaptureUploadResponse] ) -> None: - raise NotImplementedError() + request_metadata = await stream.recv_message() + assert request_metadata is not None + self.metadata = request_metadata.metadata.upload_metadata + request_data_contents = await stream.recv_message() + assert request_data_contents is not None + self.binary_data = request_data_contents.data + await stream.send_message(StreamingDataCaptureUploadResponse(file_id=self.file_upload_response)) class MockMLTraining(MLTrainingServiceBase): diff --git a/tests/test_data_sync_client.py b/tests/test_data_sync_client.py index 719917c6b..fd874ce45 100644 --- a/tests/test_data_sync_client.py +++ b/tests/test_data_sync_client.py @@ -133,6 +133,26 @@ async def test_file_upload_from_path(self, service: MockDataSync, tmp_path): assert service.metadata.file_extension == FILE_EXT assert service.binary_data == BINARY_DATA + @pytest.mark.asyncio + async def test_streaming_data_capture_upload(self, service: MockDataSync): + async with ChannelFor([service]) as channel: + client = DataClient(channel, DATA_SERVICE_METADATA) + file_id = await client.streaming_data_capture_upload( + data=BINARY_DATA, + part_id=PART_ID, + file_ext=FILE_EXT, + component_type=COMPONENT_TYPE, + component_name=COMPONENT_NAME, + method_name=METHOD_NAME, + method_parameters=METHOD_PARAMETERS, + data_request_times=DATETIMES, + tags=TAGS, + ) + assert file_id == FILE_UPLOAD_RESPONSE + self.assert_metadata(service.metadata) + assert service.metadata.file_extension == FILE_EXT + assert service.binary_data == BINARY_DATA + def assert_sensor_contents(self, sensor_contents: List[SensorData], is_binary: bool): for idx, sensor_content in enumerate(sensor_contents): assert sensor_content.metadata.time_requested.seconds == TIMESTAMPS[0].seconds