Skip to content

Commit 73f91d4

Browse files
authored
RSDK-4313 - Include data request metadata (#378)
1 parent bb24300 commit 73f91d4

File tree

4 files changed

+183
-78
lines changed

4 files changed

+183
-78
lines changed

docs/examples/_server.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
2+
from datetime import datetime
23

34
from grpclib.utils import graceful_exit
45
from grpclib.server import Server, Stream
56
from google.protobuf.struct_pb2 import Struct, Value
67
from google.protobuf.timestamp_pb2 import Timestamp
78

8-
from viam.utils import dict_to_struct, value_to_primitive
9+
from viam.app.data_client import DataClient
10+
from viam.utils import datetime_to_timestamp, dict_to_struct, value_to_primitive
911
from viam.proto.app.data import (
1012
AddBoundingBoxToImageByIDResponse,
1113
AddBoundingBoxToImageByIDRequest,
@@ -19,6 +21,7 @@
1921
BinaryDataByIDsResponse,
2022
BoundingBoxLabelsByFilterRequest,
2123
BoundingBoxLabelsByFilterResponse,
24+
CaptureMetadata,
2225
DataServiceBase,
2326
DeleteBinaryDataByFilterRequest,
2427
DeleteBinaryDataByFilterResponse,
@@ -169,19 +172,48 @@
169172
class MockData(DataServiceBase):
170173
def __init__(self):
171174
self.tabular_data_requested = False
172-
self.tabular_response = [{"PowerPct": 0, "IsPowered": False}, {"PowerPct": 0, "IsPowered": False}, {"Position": 0}]
175+
self.tabular_response = [
176+
DataClient.TabularData(
177+
{"PowerPct": 0, "IsPowered": False},
178+
CaptureMetadata(method_name="IsPowered"),
179+
datetime(2022, 1, 1, 1, 1, 1),
180+
datetime(2022, 12, 31, 23, 59, 59),
181+
),
182+
DataClient.TabularData(
183+
{"PowerPct": 0, "IsPowered": False},
184+
CaptureMetadata(location_id="loc-id"),
185+
datetime(2023, 1, 2),
186+
datetime(2023, 3, 4)
187+
),
188+
DataClient.TabularData(
189+
{"Position": 0},
190+
CaptureMetadata(),
191+
datetime(2023, 5, 6),
192+
datetime(2023, 7, 8),
193+
),
194+
]
173195

174196
async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, TabularDataByFilterResponse]) -> None:
175197
if self.tabular_data_requested:
176198
await stream.send_message(TabularDataByFilterResponse())
177199
return
178200
self.tabular_data_requested = True
179201
_ = await stream.recv_message()
180-
n = len(self.tabular_response)
181-
tabular_structs = [Struct()] * n
182-
for i in range(n):
183-
tabular_structs[i].update(self.tabular_response[i])
184-
await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_structs]))
202+
tabular_structs = []
203+
tabular_metadata = [data.metadata for data in self.tabular_response]
204+
for idx, tabular_data in enumerate(self.tabular_response):
205+
tabular_structs.append(
206+
TabularData(
207+
data=dict_to_struct(tabular_data.data),
208+
metadata_index=idx,
209+
time_requested=datetime_to_timestamp(tabular_data.time_requested),
210+
time_received=datetime_to_timestamp(tabular_data.time_received)
211+
)
212+
)
213+
await stream.send_message(TabularDataByFilterResponse(
214+
data=tabular_structs, metadata=tabular_metadata,
215+
)
216+
)
185217

186218
async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None:
187219
pass

src/viam/app/data_client.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
BinaryDataByIDsRequest,
1515
BinaryDataByIDsResponse,
1616
BinaryID,
17+
BinaryMetadata,
1718
BoundingBoxLabelsByFilterRequest,
1819
BoundingBoxLabelsByFilterResponse,
1920
CaptureInterval,
21+
CaptureMetadata,
2022
DataRequest,
2123
DataServiceStub,
2224
DeleteBinaryDataByFilterRequest,
@@ -60,6 +62,54 @@ class DataClient:
6062
`ViamClient`.
6163
"""
6264

65+
class TabularData:
66+
"""Class representing a piece of tabular data and associated metadata.
67+
68+
Args:
69+
data (Mapping[str, Any]): the requested data.
70+
metadata (viam.proto.app.data.CaptureMetadata): the metadata from the request.
71+
time_requested (datetime): the time the data request was sent.
72+
time_received (datetime): the time the requested data was received.
73+
"""
74+
75+
def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requested: datetime, time_received: datetime) -> None:
76+
self.data = data
77+
self.metadata = metadata
78+
self.time_requested = time_requested
79+
self.time_received = time_received
80+
81+
data: Mapping[str, Any]
82+
metadata: CaptureMetadata
83+
time_requested: datetime
84+
time_received: datetime
85+
86+
def __str__(self) -> str:
87+
return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n"
88+
89+
def __eq__(self, other: "DataClient.TabularData") -> bool:
90+
return str(self) == str(other)
91+
92+
class BinaryData:
93+
"""Class representing a piece of binary data and associated metadata.
94+
95+
Args:
96+
data (bytes): the requested data.
97+
metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request.
98+
"""
99+
100+
def __init__(self, data: bytes, metadata: BinaryMetadata) -> None:
101+
self.data = data
102+
self.metadata = metadata
103+
104+
data: bytes
105+
metadata: BinaryMetadata
106+
107+
def __str__(self) -> str:
108+
return f"{self.data}\n{self.metadata}"
109+
110+
def __eq__(self, other: "DataClient.BinaryData") -> bool:
111+
return str(self) == str(other)
112+
63113
def __init__(self, channel: Channel, metadata: Mapping[str, str]):
64114
"""Create a `DataClient` that maintains a connection to app.
65115
@@ -79,7 +129,7 @@ async def tabular_data_by_filter(
79129
self,
80130
filter: Optional[Filter] = None,
81131
dest: Optional[str] = None,
82-
) -> List[Mapping[str, Any]]:
132+
) -> List[TabularData]:
83133
"""Filter and download tabular data.
84134
85135
Args:
@@ -102,13 +152,18 @@ async def tabular_data_by_filter(
102152
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
103153
if not response.data or len(response.data) == 0:
104154
break
105-
data += [struct_to_dict(struct.data) for struct in response.data]
155+
data += [DataClient.TabularData(
156+
struct_to_dict(struct.data),
157+
response.metadata[struct.metadata_index],
158+
struct.time_requested.ToDatetime(),
159+
struct.time_received.ToDatetime(),
160+
) for struct in response.data]
106161
last = response.last
107162

108163
if dest:
109164
try:
110165
file = open(dest, "w")
111-
file.write(f"{data}")
166+
file.write(f"{[str(d) for d in data]}")
112167
except Exception as e:
113168
LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e)
114169
return data
@@ -117,7 +172,7 @@ async def binary_data_by_filter(
117172
self,
118173
filter: Optional[Filter] = None,
119174
dest: Optional[str] = None,
120-
) -> List[bytes]:
175+
) -> List[BinaryData]:
121176
"""Filter and download binary data.
122177
123178
Args:
@@ -139,13 +194,13 @@ async def binary_data_by_filter(
139194
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
140195
if not response.data or len(response.data) == 0:
141196
break
142-
data += [data.binary for data in response.data]
197+
data += [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
143198
last = response.last
144199

145200
if dest:
146201
try:
147202
file = open(dest, "w")
148-
file.write(f"{data}")
203+
file.write(f"{[str(d) for d in data]}")
149204
except Exception as e:
150205
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
151206

@@ -155,7 +210,7 @@ async def binary_data_by_ids(
155210
self,
156211
binary_ids: List[BinaryID],
157212
dest: Optional[str] = None,
158-
) -> List[bytes]:
213+
) -> List[BinaryData]:
159214
"""Filter and download binary data.
160215
161216
Args:
@@ -176,7 +231,7 @@ async def binary_data_by_ids(
176231
file.write(f"{response.data}")
177232
except Exception as e:
178233
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
179-
return [binary_data.binary for binary_data in response.data]
234+
return [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
180235

181236
async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int:
182237
"""Filter and delete tabular data.

tests/mocks/services.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from typing import Any, Dict, List, Mapping, Optional, Union
22

3-
from google.protobuf.struct_pb2 import Struct
43
from grpclib.server import Stream
54
from PIL import Image
65

@@ -191,11 +190,12 @@
191190
SensorsServiceBase,
192191
)
193192
from viam.proto.service.vision import Classification, Detection
193+
from viam.app.data_client import DataClient
194194
from viam.services.mlmodel import File, LabelType, Metadata, MLModel, TensorInfo
195195
from viam.services.navigation import Navigation
196196
from viam.services.slam import SLAM
197197
from viam.services.vision import Vision
198-
from viam.utils import ValueTypes, struct_to_dict
198+
from viam.utils import ValueTypes, datetime_to_timestamp, dict_to_struct, struct_to_dict
199199

200200

201201
class MockVision(Vision):
@@ -483,8 +483,8 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option
483483
class MockData(DataServiceBase):
484484
def __init__(
485485
self,
486-
tabular_response: List[Mapping[str, Any]],
487-
binary_response: List[bytes],
486+
tabular_response: List[DataClient.TabularData],
487+
binary_response: List[DataClient.BinaryData],
488488
delete_remove_response: int,
489489
tags_response: List[str],
490490
bbox_labels_response: List[str],
@@ -504,11 +504,20 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T
504504
await stream.send_message(TabularDataByFilterResponse(data=None))
505505
return
506506
self.filter = request.data_request.filter
507-
n = len(self.tabular_response)
508-
tabular_response_structs = [Struct()] * n
509-
for i in range(n):
510-
tabular_response_structs[i].update(self.tabular_response[i])
511-
await stream.send_message(TabularDataByFilterResponse(data=[TabularData(data=struct) for struct in tabular_response_structs]))
507+
tabular_response_structs = []
508+
tabular_metadata = [data.metadata for data in self.tabular_response]
509+
for idx, tabular_data in enumerate(self.tabular_response):
510+
tabular_response_structs.append(
511+
TabularData(
512+
data=dict_to_struct(tabular_data.data),
513+
metadata_index=idx,
514+
time_requested=datetime_to_timestamp(tabular_data.time_requested),
515+
time_received=datetime_to_timestamp(tabular_data.time_received)
516+
)
517+
)
518+
await stream.send_message(TabularDataByFilterResponse(
519+
data=tabular_response_structs, metadata=tabular_metadata)
520+
)
512521
self.was_tabular_data_requested = True
513522

514523
async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None:
@@ -518,14 +527,18 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin
518527
await stream.send_message(BinaryDataByFilterResponse())
519528
return
520529
self.filter = request.data_request.filter
521-
await stream.send_message(BinaryDataByFilterResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response]))
530+
await stream.send_message(BinaryDataByFilterResponse(
531+
data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response])
532+
)
522533
self.was_binary_data_requested = True
523534

524535
async def BinaryDataByIDs(self, stream: Stream[BinaryDataByIDsRequest, BinaryDataByIDsResponse]) -> None:
525536
request = await stream.recv_message()
526537
assert request is not None
527538
self.binary_ids = request.binary_ids
528-
await stream.send_message(BinaryDataByIDsResponse(data=[BinaryData(binary=binary_data) for binary_data in self.binary_response]))
539+
await stream.send_message(BinaryDataByIDsResponse(
540+
data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response])
541+
)
529542

530543
async def DeleteTabularDataByFilter(self, stream: Stream[DeleteTabularDataByFilterRequest, DeleteTabularDataByFilterResponse]) -> None:
531544
request = await stream.recv_message()

0 commit comments

Comments
 (0)