Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions docs/examples/example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1007,16 +1007,11 @@
"\n",
"from viam.proto.app.data import Filter, CaptureInterval, TagsFilter\n",
"\n",
"left_motor_filter = Filter(\n",
"left_motor_filter = data_client.create_filter(\n",
" component_name=\"left_motor\",\n",
" interval=CaptureInterval(\n",
" # datetime_to_timestamp() converts a datetime to a Timestamp.\n",
" start=DataClient.datetime_to_timestamp(datetime(2023, 6, 5, 11)),\n",
" end=DataClient.datetime_to_timestamp(datetime(2023, 6, 5, 13, 30))\n",
" ),\n",
" tags_filter=TagsFilter(\n",
" tags=[\"speed_test\"]\n",
" )\n",
" start_time=datetime(2023, 6, 5, 11),\n",
" end_time=datetime(2023, 6, 5, 13, 30),\n",
" tags=[\"speed_test_run\"]\n",
")\n",
"\n",
"data = await data_client.tabular_data_by_filter(filter=left_motor_filter)\n",
Expand Down
191 changes: 129 additions & 62 deletions src/viam/app/data/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
BinaryID,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureInterval,
DataRequest,
DataServiceStub,
DeleteBinaryDataByFilterRequest,
Expand All @@ -36,6 +37,7 @@
TabularDataByFilterResponse,
TagsByFilterRequest,
TagsByFilterResponse,
TagsFilter
)
from viam.proto.app.datasync import (
DataCaptureUploadRequest,
Expand Down Expand Up @@ -183,7 +185,7 @@ async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int:
"""Filter and delete tabular data.

Args:
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to delete. Not passing a `Filter` will lead to
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to delete. Passing an empty `Filter` will lead to
all data being deleted. Exercise caution when using this option.
"""
filter = filter if filter else Filter()
Expand All @@ -195,8 +197,8 @@ async def delete_binary_data_by_filter(self, filter: Optional[Filter]) -> int:
"""Filter and delete binary data.

Args:
filter (viam.proto.app.data.Filter): Optional `Filter` specifying binary data to delete. Not passing a `Filter` will lead to all
data being deleted. Exercise caution when using this option.
filter (viam.proto.app.data.Filter): Optional `Filter` specifying binary data to delete. Passing an empty `Filter` will lead to
all data being deleted. Exercise caution when using this option.
"""
filter = filter if filter else Filter()
request = DeleteBinaryDataByFilterRequest(filter=filter)
Expand Down Expand Up @@ -327,40 +329,41 @@ async def bounding_box_labels_by_filter(self, filter: Optional[Filter] = None) -

async def binary_data_capture_upload(
self,
binary_data: bytes,
part_id: str,
component_type: str,
component_name: str,
method_name: str,
method_parameters: Optional[Mapping[str, Any]],
tags: Optional[List[str]],
data_request_times: Optional[Tuple[Optional[datetime], Optional[datetime]]],
binary_data: bytes,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
data_request_times: Optional[Tuple[datetime, datetime]] = None,
) -> None:
"""Upload binary sensor data.

Sync binary data collected on a robot through a specific component (e.g., a motor) along with the relevant metadata with
app.viam.com. Binary data can be found under the "Files" tab in Data on app.viam.com.

Args:
binary_data (bytes): The data to be uploaded, respresented in bytes.
part_id (str): Part ID of the component used to capture the data.
component_type (str): Type of the component used to capture the data (e.g., "movement_sensor").
component_name (str): Name of the component used to capture the data.
method_name (str): Name of the method used to capture the data.
method_parameters (Optional[Mapping[str, Any]]): Optional dictionary of method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based data filtering when retrieving data.
data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s denoting the
times this data was requested[0] and received[1] by the appropriate sensor.
binary_data (bytes): The data to be uploaded, respresented in bytes.
data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s objects
denoting the times this data was requested[0] and received[1] by the appropriate sensor.

Raises:
GRPCError: If an invalid part ID is passed.
"""
sensor_contents = SensorData(
metadata=SensorMetadata(
time_requested=(
self.datetime_to_timestamp(data_request_times[0]) if data_request_times and data_request_times[0] else None
),
time_received=(self.datetime_to_timestamp(data_request_times[1]) if data_request_times and data_request_times[1] else None),
metadata=(
SensorMetadata(
time_requested=self.datetime_to_timestamp(data_request_times[0]) if data_request_times[0] else None,
time_received=self.datetime_to_timestamp(data_request_times[1]) if data_request_times[1] else None
)
if data_request_times else None
),
struct=None, # Used for tabular data.
binary=binary_data,
Expand All @@ -372,45 +375,47 @@ async def binary_data_capture_upload(
method_name=method_name,
type=DataType.DATA_TYPE_BINARY_SENSOR,
file_name=None, # Not used in app.
method_parameters=method_parameters if method_parameters else None,
method_parameters=method_parameters,
file_extension=None, # Will be stored as empty string "".
tags=tags if tags else None,
tags=tags,
)
_: DataCaptureUploadResponse = await self._data_capture_upload(metadata=metadata, sensor_contents=[sensor_contents])

async def tabular_data_capture_upload(
self,
tabular_data: List[Mapping[str, Any]],
part_id: str,
component_type: str,
component_name: str,
method_name: str,
method_parameters: Optional[Mapping[str, Any]],
tags: Optional[List[str]],
data_request_times: Optional[List[Tuple[Optional[datetime], Optional[datetime]]]],
tabular_data: List[Mapping[str, Any]],
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
data_request_times: Optional[List[Tuple[datetime, datetime]]] = None,
) -> None:
"""Upload tabular sensor data.

Sync tabular data collected on a robot through a specific component (e.g., a motor) along with the relevant metadata with
app.viam.com. Tabular data can be found under the "Sensors" tab in Data on app.viam.com.

Args:
tabular_data (List[Mapping[str, Any]]): List of the data to be uploaded, represented tabularly as a collection of dictionaries.
part_id (str): Part ID of the component used to capture the data.
component_type (str): Type of the component used to capture the data (e.g., "movement_sensor").
component_name (str): Name of the component used to capture the data.
method_name (str): Name of the method used to capture the data.
method_parameters (Optional[Mapping[str, Any]]): Optional dictionary of method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based data filtering when retrieving data.
data_request_times (Optional[List[Tuple[datetime.datetime, datetime.datetime]]]): Optional list of tuples, each containing
`datetime`s denoting the times this data was requested[0] and received[1] by the appropriate sensor.
tabular_data (List[Mapping[str, Any]]): List of the data to be uploaded, represented tabularly as a collection of dictionaries.
`datetime` objects denoting the times this data was requested[0] and received[1] by the appropriate sensor.


Passing a list of tabular data and Timestamps with length n > 1 will result in n datapoints being uploaded, all tied to the same
metadata.

Raises:
GRPCError: If an invalid part ID is passed.
AssertionError: If a list of `Timestamp`s is provided and its length does not match the length of the list of tabular data.
AssertionError: If a list of `Timestamp` objects is provided and its length does not match the length of the list of tabular
data.
"""
sensor_contents = [None] * len(tabular_data)
if data_request_times:
Expand All @@ -420,15 +425,15 @@ async def tabular_data_capture_upload(
s = Struct()
s.update(tabular_data[i])
sensor_contents[i] = SensorData(
metadata=SensorMetadata(
time_requested=(
self.datetime_to_timestamp(data_request_times[i][0]) if data_request_times and data_request_times[i][0] else None
),
time_received=(
self.datetime_to_timestamp(data_request_times[i][1]) if data_request_times and data_request_times[i][1] else None
),
),
struct=s,
metadata=(
SensorMetadata(
time_requested=self.datetime_to_timestamp(data_request_times[i][0]) if data_request_times[i][0] else None,
time_received=self.datetime_to_timestamp(data_request_times[i][1]) if data_request_times[i][1] else None
)
if data_request_times[i] else None
)
if data_request_times else None,
struct=s
)

metadata = UploadMetadata(
Expand All @@ -438,9 +443,9 @@ async def tabular_data_capture_upload(
method_name=method_name,
type=DataType.DATA_TYPE_TABULAR_SENSOR,
file_name=None, # Not used in app.
method_parameters=method_parameters if method_parameters else None,
method_parameters=method_parameters,
file_extension=None, # Will be stored as empty string "".
tags=tags if tags else None,
tags=tags
)
_: DataCaptureUploadResponse = await self._data_capture_upload(metadata=metadata, sensor_contents=sensor_contents)

Expand All @@ -452,14 +457,14 @@ async def _data_capture_upload(self, metadata: UploadMetadata, sensor_contents:
async def file_upload(
self,
part_id: str,
component_type: Optional[str],
component_name: Optional[str],
method_name: Optional[str],
file_name: Optional[str],
method_parameters: Optional[Mapping[str, Any]],
file_extension: Optional[str],
tags: Optional[List[str]],
data: Optional[bytes],
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
file_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
file_extension: Optional[str] = None,
tags: Optional[List[str]] = None,
data: Optional[bytes] = None,
) -> None:
"""Upload arbitrary file data.

Expand All @@ -483,39 +488,40 @@ async def file_upload(
"""
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else None,
component_name=component_name if component_name else None,
method_name=method_name if method_name else None,
component_type=component_type,
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_FILE,
file_name=file_name if file_name else None,
method_parameters=method_parameters if method_parameters else None,
file_extension=file_extension if file_extension else None,
tags=tags if tags else None,
file_name=file_name,
method_parameters=method_parameters,
file_extension=file_extension,
tags=tags,
)
_: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data))

async def file_upload_from_path(
self,
part_id: str,
component_type: Optional[str],
component_name: Optional[str],
method_name: Optional[str],
method_parameters: Optional[Mapping[str, Any]],
tags: Optional[List[str]],
filepath: str,
part_id: str,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None
) -> None:
"""Upload arbitrary file data.

Sync file data that may be stored on a robot along with the relevant metadata to app.viam.com.

Args:
filepath (str): Absolute filepath of file to be uploaded.
part_id (str): Part ID of the component associated with the file.
component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
filepath (str): Absolute filepath of file to be uploaded.


Raises:
GRPCError: If an invalid part ID is passed.
Expand All @@ -530,14 +536,14 @@ async def file_upload_from_path(

metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else None,
component_name=component_name if component_name else None,
method_name=method_name if method_name else None,
component_type=component_type,
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_FILE,
file_name=file_name,
method_parameters=method_parameters if method_parameters else None,
method_parameters=method_parameters,
file_extension=file_extension,
tags=tags if tags else None,
tags=tags,
)
_: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data))

Expand All @@ -563,3 +569,64 @@ def datetime_to_timestamp(dt: datetime) -> Timestamp:
timestamp = Timestamp()
timestamp.FromDatetime(dt)
return timestamp

@staticmethod
def create_filter(
component_name: Optional[str] = None,
component_type: Optional[str] = None,
method: Optional[str] = None,
robot_name: Optional[str] = None,
robot_id: Optional[str] = None,
part_name: Optional[str] = None,
part_id: Optional[str] = None,
location_ids: Optional[List[str]] = None,
organization_ids: Optional[List[str]] = None,
mime_type: Optional[List[str]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
tags: Optional[List[str]] = None,
bbox_labels: Optional[List[str]] = None
) -> Filter:
"""Create a `Filter`.

Args:
component_name (Optional[str]): Optional name of the component that captured the data being filtered (e.g., "left_motor").
component_type (Optional[str]): Optional type of the componenet that captured the data being filtered (e.g., "motor").
method (Optional[str]): Optional name of the method used to capture the data being filtered (e.g., "IsPowered").
robot_name (Optional[str]): Optional name of the robot associated with the data being filtered (e.g., "viam_rover_1").
robot_id (Optional[str]): Optional ID of the robot associated with the data being filtered.
part_name (Optional[str]): Optional name of the system part associated with the data being filtered (e.g., "viam_rover_1-main").
part_id (Optional[str]): Optional ID of the system part associated with the data being filtered.
location_ids (Optional[List[str]]): Optional list of location IDs associated with the data being filtered.
organization_ids (Optional[List[str]]): Optional list of organization IDs associated with the data being filtered.
mime_type (Optional[List[str]]): Optional mime type of data being filtered (e.g., "image/png").
start_time (Optional[datetime.datetime]): Optional start time of an interval to filter data by.
end_time (Optional[datetime.datetime]): Optional end time of an interval to filter data by.
tags (Optional[List[str]]): Optional list of tags attached to the data being filtered (e.g., ["test"]).
bbox_labels (Optional[List[str]]): Optional list of bounding box labels attached to the data being filtered (e.g., ["square",
"circle"]).

Returns:
viam.proto.app.data.Filter: The `Filter` object.
"""
return Filter(
component_name=component_name,
component_type=component_type,
method=method,
robot_name=robot_name,
robot_id=robot_id,
part_name=part_name,
part_id=part_id,
location_ids=location_ids,
organization_ids=organization_ids,
mime_type=mime_type,
interval=(
CaptureInterval(
start=DataClient.datetime_to_timestamp(start_time) if start_time else None,
end=DataClient.datetime_to_timestamp(end_time) if end_time else None
)
)
if start_time and end_time else None,
tags_filter=TagsFilter(tags=tags),
bbox_labels=bbox_labels
)
Loading