Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions docs/examples/_server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
from datetime import datetime

from grpclib.utils import graceful_exit
from grpclib.server import Server, Stream
from google.protobuf.struct_pb2 import Struct, Value
from google.protobuf.timestamp_pb2 import Timestamp

from viam.utils import dict_to_struct, value_to_primitive
from viam.app.data_client import DataClient
from viam.utils import datetime_to_timestamp, dict_to_struct, value_to_primitive
from viam.proto.app.data import (
AddBoundingBoxToImageByIDResponse,
AddBoundingBoxToImageByIDRequest,
Expand All @@ -19,6 +21,7 @@
BinaryDataByIDsResponse,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureMetadata,
DataServiceBase,
DeleteBinaryDataByFilterRequest,
DeleteBinaryDataByFilterResponse,
Expand Down Expand Up @@ -169,19 +172,48 @@
class MockData(DataServiceBase):
def __init__(self):
self.tabular_data_requested = False
self.tabular_response = [{"PowerPct": 0, "IsPowered": False}, {"PowerPct": 0, "IsPowered": False}, {"Position": 0}]
self.tabular_response = [
DataClient.TabularData(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(q) In general, what is the "data" being collected? Any data the robot is giving? And what is binary data vs tabular data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! The data is currently just all data being collected/stored by the robot in app. I don't think I could give a very good answer on the differences between binary and tabular data. I think tabular is going to be mapped data in a lot of ways (e.g., a mapping of function call to output), while binary data is data that's just stored as bytes (e.g., raw files, or a photo). The data team could definitely give a better answer though!

{"PowerPct": 0, "IsPowered": False},
CaptureMetadata(method_name="IsPowered"),
datetime(2022, 1, 1, 1, 1, 1),
datetime(2022, 12, 31, 23, 59, 59),
),
DataClient.TabularData(
{"PowerPct": 0, "IsPowered": False},
CaptureMetadata(location_id="loc-id"),
datetime(2023, 1, 2),
datetime(2023, 3, 4)
),
DataClient.TabularData(
{"Position": 0},
CaptureMetadata(),
datetime(2023, 5, 6),
datetime(2023, 7, 8),
),
]

async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, TabularDataByFilterResponse]) -> None:
if self.tabular_data_requested:
await stream.send_message(TabularDataByFilterResponse())
return
self.tabular_data_requested = True
_ = await stream.recv_message()
n = len(self.tabular_response)
tabular_structs = [Struct()] * n
for i in range(n):
tabular_structs[i].update(self.tabular_response[i])
await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_structs]))
tabular_structs = []
tabular_metadata = [data.metadata for data in self.tabular_response]
for idx, tabular_data in enumerate(self.tabular_response):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted against a list comprehension here because I thought it was a bit cluttered and awkward to read. Not a strong preference; happy to revert to a list comprehension if others prefer it.

tabular_structs.append(
TabularData(
data=dict_to_struct(tabular_data.data),
metadata_index=idx,
time_requested=datetime_to_timestamp(tabular_data.time_requested),
time_received=datetime_to_timestamp(tabular_data.time_received)
)
)
await stream.send_message(TabularDataByFilterResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(q) Where is this message being sent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The message is being sent back along the channel where the request was sent, so basically back to the calling client.

data=tabular_structs, metadata=tabular_metadata,
)
)

async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None:
pass
Expand Down
71 changes: 63 additions & 8 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
BinaryDataByIDsRequest,
BinaryDataByIDsResponse,
BinaryID,
BinaryMetadata,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureInterval,
CaptureMetadata,
DataRequest,
DataServiceStub,
DeleteBinaryDataByFilterRequest,
Expand Down Expand Up @@ -60,6 +62,54 @@ class DataClient:
`ViamClient`.
"""

class TabularData:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the actual proto type for tabular data requests seemed like a bad idea because it's awkward to navigate and has a struct (which we wanted to get rid of. But, time_requested and time_received are their own fields, and I thought returning a 4ple would be super ugly. So, I created a class to represent meaningful tabular data, and created an analogous class for binary data for consistency's sake.

Happy to revisit the type question if there's pushback against creating these shadow classes.

"""Class representing a piece of tabular data and associated metadata.

Args:
data (Mapping[str, Any]): the requested data.
metadata (viam.proto.app.data.CaptureMetadata): the metadata from the request.
time_requested (datetime): the time the data request was sent.
time_received (datetime): the time the requested data was received.
"""

def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requested: datetime, time_received: datetime) -> None:
self.data = data
self.metadata = metadata
self.time_requested = time_requested
self.time_received = time_received

data: Mapping[str, Any]
metadata: CaptureMetadata
time_requested: datetime
time_received: datetime

def __str__(self) -> str:
return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n"

def __eq__(self, other: "DataClient.TabularData") -> bool:
return str(self) == str(other)

class BinaryData:
"""Class representing a piece of binary data and associated metadata.

Args:
data (bytes): the requested data.
metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request.
"""

def __init__(self, data: bytes, metadata: BinaryMetadata) -> None:
self.data = data
self.metadata = metadata

data: bytes
metadata: BinaryMetadata

def __str__(self) -> str:
return f"{self.data}\n{self.metadata}"

def __eq__(self, other: "DataClient.BinaryData") -> bool:
return str(self) == str(other)

def __init__(self, channel: Channel, metadata: Mapping[str, str]):
"""Create a `DataClient` that maintains a connection to app.

Expand All @@ -79,7 +129,7 @@ async def tabular_data_by_filter(
self,
filter: Optional[Filter] = None,
dest: Optional[str] = None,
) -> List[Mapping[str, Any]]:
) -> List[TabularData]:
"""Filter and download tabular data.

Args:
Expand All @@ -102,13 +152,18 @@ async def tabular_data_by_filter(
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
if not response.data or len(response.data) == 0:
break
data += [struct_to_dict(struct.data) for struct in response.data]
data += [DataClient.TabularData(
struct_to_dict(struct.data),
response.metadata[struct.metadata_index],
struct.time_requested.ToDatetime(),
struct.time_received.ToDatetime(),
) for struct in response.data]
last = response.last

if dest:
try:
file = open(dest, "w")
file.write(f"{data}")
file.write(f"{[str(d) for d in data]}")
except Exception as e:
LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e)
return data
Expand All @@ -117,7 +172,7 @@ async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
dest: Optional[str] = None,
) -> List[bytes]:
) -> List[BinaryData]:
"""Filter and download binary data.

Args:
Expand All @@ -139,13 +194,13 @@ async def binary_data_by_filter(
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
if not response.data or len(response.data) == 0:
break
data += [data.binary for data in response.data]
data += [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
last = response.last

if dest:
try:
file = open(dest, "w")
file.write(f"{data}")
file.write(f"{[str(d) for d in data]}")
except Exception as e:
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)

Expand All @@ -155,7 +210,7 @@ async def binary_data_by_ids(
self,
binary_ids: List[BinaryID],
dest: Optional[str] = None,
) -> List[bytes]:
) -> List[BinaryData]:
"""Filter and download binary data.

Args:
Expand All @@ -176,7 +231,7 @@ async def binary_data_by_ids(
file.write(f"{response.data}")
except Exception as e:
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
return [binary_data.binary for binary_data in response.data]
return [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]

async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int:
"""Filter and delete tabular data.
Expand Down
35 changes: 24 additions & 11 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Any, Dict, List, Mapping, Optional, Union

from google.protobuf.struct_pb2 import Struct
from grpclib.server import Stream
from PIL import Image

Expand Down Expand Up @@ -191,11 +190,12 @@
SensorsServiceBase,
)
from viam.proto.service.vision import Classification, Detection
from viam.app.data_client import DataClient
from viam.services.mlmodel import File, LabelType, Metadata, MLModel, TensorInfo
from viam.services.navigation import Navigation
from viam.services.slam import SLAM
from viam.services.vision import Vision
from viam.utils import ValueTypes, struct_to_dict
from viam.utils import ValueTypes, datetime_to_timestamp, dict_to_struct, struct_to_dict


class MockVision(Vision):
Expand Down Expand Up @@ -483,8 +483,8 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option
class MockData(DataServiceBase):
def __init__(
self,
tabular_response: List[Mapping[str, Any]],
binary_response: List[bytes],
tabular_response: List[DataClient.TabularData],
binary_response: List[DataClient.BinaryData],
delete_remove_response: int,
tags_response: List[str],
bbox_labels_response: List[str],
Expand All @@ -504,11 +504,20 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T
await stream.send_message(TabularDataByFilterResponse(data=None))
return
self.filter = request.data_request.filter
n = len(self.tabular_response)
tabular_response_structs = [Struct()] * n
for i in range(n):
tabular_response_structs[i].update(self.tabular_response[i])
await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_response_structs]))
tabular_response_structs = []
tabular_metadata = [data.metadata for data in self.tabular_response]
for idx, tabular_data in enumerate(self.tabular_response):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above comment re: use of list comprehension here.

tabular_response_structs.append(
TabularData(
data=dict_to_struct(tabular_data.data),
metadata_index=idx,
time_requested=datetime_to_timestamp(tabular_data.time_requested),
time_received=datetime_to_timestamp(tabular_data.time_received)
)
)
await stream.send_message(TabularDataByFilterResponse(
data=tabular_response_structs, metadata=tabular_metadata)
)
self.was_tabular_data_requested = True

async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None:
Expand All @@ -518,14 +527,18 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin
await stream.send_message(BinaryDataByFilterResponse())
return
self.filter = request.data_request.filter
await stream.send_message(BinaryDataByFilterResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response]))
await stream.send_message(BinaryDataByFilterResponse(
data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response])
)
self.was_binary_data_requested = True

async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDataByIDsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
self.binary_ids = request.binary_ids
await stream.send_message(BinaryDataByIDsResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response]))
await stream.send_message(BinaryDataByIDsResponse(
data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response])
)

async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None:
request = await stream.recv_message()
Expand Down
Loading