Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
TagsByFilterResponse,
)
from viam.proto.app.datasync import (
DataCaptureUploadMetadata,
DataCaptureUploadRequest,
DataCaptureUploadResponse,
DataSyncServiceStub,
Expand All @@ -49,6 +50,8 @@
FileUploadResponse,
SensorData,
SensorMetadata,
StreamingDataCaptureUploadRequest,
StreamingDataCaptureUploadResponse,
UploadMetadata,
)
from viam.utils import create_filter, datetime_to_timestamp, struct_to_dict
Expand Down Expand Up @@ -466,8 +469,8 @@ async def binary_data_capture_upload(
sensor_contents = SensorData(
metadata=(
SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times[0] else None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(flyby) this would have caused a runtime error in cases where the (ostensibly optional) tuple was None.

time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times[1] else None,
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if data_request_times[0] and not data_request_times[1]? Same question below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So data_request_times is annotated as an Optional[Tuple[datetime, datetime]], not a Tuple[Optional[datetime], Optional[datetime]], so there should either be a tuple with two Somes or a None. So I think the case you're describing is one that's inconsistent with how we expect input. Definitely open to discussing changing what we expect though!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no I misunderstood the typing; that sounds right to me thanks for the context!

)
if data_request_times
else None
Expand Down Expand Up @@ -537,8 +540,8 @@ async def tabular_data_capture_upload(
SensorData(
metadata=(
SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[idx][0]) if data_request_times[idx][0] else None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(flyby) this would have caused a runtime error in cases where the (ostensibly optional) list of tuples was None.

time_received=datetime_to_timestamp(data_request_times[idx][1]) if data_request_times[idx][1] else None,
time_requested=datetime_to_timestamp(data_request_times[idx][0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[idx][1]) if data_request_times else None,
)
if data_request_times[idx]
else None
Expand Down Expand Up @@ -566,6 +569,63 @@ async def _data_capture_upload(self, metadata: UploadMetadata, sensor_contents:
response: DataCaptureUploadResponse = await self._data_sync_client.DataCaptureUpload(request, metadata=self._metadata)
return response

async def streaming_data_capture_upload(
self,
data: bytes,
part_id: str,
file_ext: str,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
data_request_times: Optional[Tuple[datetime, datetime]] = None,
tags: Optional[List[str]] = None,
) -> str:
"""Uploads the metadata and contents of streaming binary data.

Args:
data (bytes): the data to be uploaded.
part_id (str): Part ID of the resource associated with the file.
file_ext (str): file extension type for the data. required for determining MIME type.
component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "No longer in active use" mean? Should we not even have this parameter in the SDK if it's "deprecated"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good question! The no longer in active use language is cribbed from how method_parameters is annotated elsewhere in the SDK. I don't remember the details precisely, but I remember conversations with the data team basically being "We don't use this now, but we should still allow it to be passed because it does update on the backend."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool; so sort just short of deprecated. Seems fine to me, just wanted to call out.

data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s objects
denoting the times this data was requested[0] by the robot and received[1] from the appropriate sensor.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.

Raises:
GRPCError: If an invalid part ID is passed.

Returns:
str: the file_id of the uploaded data.
"""

upload_metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
method_parameters=method_parameters,
type=DataType.DATA_TYPE_BINARY_SENSOR,
file_extension=file_ext if file_ext[0] == "." else f".{file_ext}",
tags=tags,
)
sensor_metadata = SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None,
)
metadata = DataCaptureUploadMetadata(upload_metadata=upload_metadata, sensor_metadata=sensor_metadata)
request_metadata = StreamingDataCaptureUploadRequest(metadata=metadata)
async with self._data_sync_client.StreamingDataCaptureUpload.open(metadata=self._metadata) as stream:
await stream.send_message(request_metadata)
await stream.send_message(StreamingDataCaptureUploadRequest(data=data), end=True)
response: StreamingDataCaptureUploadResponse = await stream.recv_message()
if not response:
await stream.recv_trailing_metadata() # causes us to throw appropriate gRPC error
return response.file_id

async def file_upload(
self,
part_id: str,
Expand Down
8 changes: 7 additions & 1 deletion tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,13 @@ async def FileUpload(self, stream: Stream[FileUploadRequest, FileUploadResponse]
async def StreamingDataCaptureUpload(
self, stream: Stream[StreamingDataCaptureUploadRequest, StreamingDataCaptureUploadResponse]
) -> None:
raise NotImplementedError()
request_metadata = await stream.recv_message()
assert request_metadata is not None
self.metadata = request_metadata.metadata.upload_metadata
request_data_contents = await stream.recv_message()
assert request_data_contents is not None
self.binary_data = request_data_contents.data
await stream.send_message(StreamingDataCaptureUploadResponse(file_id=self.file_upload_response))


class MockMLTraining(MLTrainingServiceBase):
Expand Down
20 changes: 20 additions & 0 deletions tests/test_data_sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,26 @@ async def test_file_upload_from_path(self, service: MockDataSync, tmp_path):
assert service.metadata.file_extension == FILE_EXT
assert service.binary_data == BINARY_DATA

@pytest.mark.asyncio
async def test_streaming_data_capture_upload(self, service: MockDataSync):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
file_id = await client.streaming_data_capture_upload(
data=BINARY_DATA,
part_id=PART_ID,
file_ext=FILE_EXT,
component_type=COMPONENT_TYPE,
component_name=COMPONENT_NAME,
method_name=METHOD_NAME,
method_parameters=METHOD_PARAMETERS,
data_request_times=DATETIMES,
tags=TAGS,
)
assert file_id == FILE_UPLOAD_RESPONSE
self.assert_metadata(service.metadata)
assert service.metadata.file_extension == FILE_EXT
assert service.binary_data == BINARY_DATA

def assert_sensor_contents(self, sensor_contents: List[SensorData], is_binary: bool):
for idx, sensor_content in enumerate(sensor_contents):
assert sensor_content.metadata.time_requested.seconds == TIMESTAMPS[0].seconds
Expand Down