Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/viam/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from inspect import iscoroutinefunction
from threading import Lock
from typing import List, Mapping, Optional, Sequence, Tuple
from typing_extensions import Self

from grpclib.utils import _service_name
from typing_extensions import Self

from viam import logging
from viam.components.component_base import ComponentBase
Expand Down
6 changes: 3 additions & 3 deletions src/viam/services/mlmodel/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from viam.errors import ResourceNotFoundError
from viam.proto.service.mlmodel import InferRequest, InferResponse, MetadataRequest, MetadataResponse, MLModelServiceBase
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct
from viam.utils import dict_to_struct, struct_to_dict

from .mlmodel import MLModel


class MLModelRPCService(MLModelServiceBase, ResourceRPCServiceBase):
class MLModelRPCService(MLModelServiceBase, ResourceRPCServiceBase[MLModel]):
"""
gRPC service for a ML Model service
"""
Expand All @@ -24,7 +24,7 @@ async def Infer(self, stream: Stream[InferRequest, InferResponse]) -> None:
except ResourceNotFoundError as e:
raise e.grpc_error
timeout = stream.deadline.time_remaining() if stream.deadline else None
output_data = await mlmodel.infer(input_data=request.input_data, timeout=timeout)
output_data = await mlmodel.infer(input_data=struct_to_dict(request.input_data), timeout=timeout)
response = InferResponse(output_data=dict_to_struct(output_data))
await stream.send_message(response)

Expand Down
6 changes: 6 additions & 0 deletions src/viam/services/vision/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from viam.resource.registry import Registry, ResourceRegistration
from viam.services.vision.service import VisionRPCService
from .client import Classification, Detection, VisionClient
from .vision import Vision

__all__ = [
"Classification",
"Detection",
"VisionClient",
"Vision",
]

Registry.register_subtype(ResourceRegistration(Vision, VisionRPCService, lambda name, channel: VisionClient(name, channel)))
81 changes: 6 additions & 75 deletions src/viam/services/vision/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from io import BytesIO
from typing import Any, Final, List, Mapping, Optional, Union
from typing import Any, List, Mapping, Optional, Union

from grpclib.client import Channel

Expand All @@ -22,37 +22,27 @@
VisionServiceStub,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.resource.types import RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, Subtype
from viam.services.service_client_base import ServiceClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from .vision import Vision

class VisionClient(ServiceClientBase, ReconfigurableResourceRPCClientBase):

class VisionClient(Vision, ReconfigurableResourceRPCClientBase):
"""
Connect to the Vision service, which allows you to access various computer vision algorithms
(like detection, segmentation, tracking, etc) that usually only require a camera or image input.
"""

SUBTYPE: Final = Subtype(RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, "vision")
client: VisionServiceStub

def __init__(self, name: str, channel: Channel):
super().__init__(name, channel)
super().__init__(name)
self.channel = channel
self.client = VisionServiceStub(channel)

async def get_detections_from_camera(
self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None
) -> List[Detection]:
"""Get a list of detections in the next image given a camera and a detector

Args:
camera_name (str): The name of the camera to use for detection

Returns:
List[viam.proto.service.vision.Detection]: A list of 2D bounding boxes, their labels, and the
confidence score of the labels, around the found objects in the next 2D image
from the given camera, with the given detector applied to it.
"""
if extra is None:
extra = {}
request = GetDetectionsFromCameraRequest(name=self.name, camera_name=camera_name, extra=dict_to_struct(extra))
Expand All @@ -66,16 +56,6 @@ async def get_detections(
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Detection]:
"""Get a list of detections in the given image using the specified detector

Args:
image (Image): The image to get detections from

Returns:
List[viam.proto.service.vision.Detection]: A list of 2D bounding boxes, their labels, and the
confidence score of the labels, around the found objects in the next 2D image
from the given camera, with the given detector applied to it.
"""
if extra is None:
extra = {}
mime_type = CameraMimeType.JPEG
Expand All @@ -101,15 +81,6 @@ async def get_classifications_from_camera(
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Classification]:
"""Get a list of classifications in the next image given a camera and a classifier

Args:
camera_name (str): The name of the camera to use for detection
count (int): The number of classifications desired

returns:
List[viam.proto.service.vision.Classification]: The list of Classifications
"""
if extra is None:
extra = {}
request = GetClassificationsFromCameraRequest(name=self.name, camera_name=camera_name, n=count, extra=dict_to_struct(extra))
Expand All @@ -124,14 +95,6 @@ async def get_classifications(
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Classification]:
"""Get a list of detections in the given image using the specified detector

Args:
image (Image): The image to get detections from

Returns:
List[viam.proto.service.vision.Classification]: The list of Classifications
"""
if extra is None:
extra = {}
mime_type = CameraMimeType.JPEG
Expand All @@ -153,30 +116,6 @@ async def get_classifications(
async def get_object_point_clouds(
self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None
) -> List[PointCloudObject]:
"""
Returns a list of the 3D point cloud objects and associated metadata in the latest
picture obtained from the specified 3D camera (using the specified segmenter).

To deserialize the returned information into a numpy array, use the Open3D library.
::

import numpy as np
import open3d as o3d

object_point_clouds = await vision.get_object_point_clouds(camera_name, segmenter_name)

# write the first object point cloud into a temporary file
with open("/tmp/pointcloud_data.pcd", "wb") as f:
f.write(object_point_clouds[0].point_cloud)
pcd = o3d.io.read_point_cloud("/tmp/pointcloud_data.pcd")
points = np.asarray(pcd.points)

Args:
camera_name (str): The name of the camera

Returns:
List[viam.proto.common.PointCloudObject]: The pointcloud objects with metadata
"""
if extra is None:
extra = {}
request = GetObjectPointCloudsRequest(
Expand All @@ -189,14 +128,6 @@ async def get_object_point_clouds(
return list(response.objects)

async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None) -> Mapping[str, ValueTypes]:
"""Send/receive arbitrary commands

Args:
command (Dict[str, ValueTypes]): The command to execute

Returns:
Dict[str, ValueTypes]: Result of the executed command
"""
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout)
return struct_to_dict(response.result)
106 changes: 106 additions & 0 deletions src/viam/services/vision/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from io import BytesIO

from grpclib.server import Stream
from PIL import Image

from viam.media.video import LIBRARY_SUPPORTED_FORMATS, CameraMimeType, RawImage
from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.vision import (
GetClassificationsFromCameraRequest,
GetClassificationsFromCameraResponse,
GetClassificationsRequest,
GetClassificationsResponse,
GetDetectionsFromCameraRequest,
GetDetectionsFromCameraResponse,
GetDetectionsRequest,
GetDetectionsResponse,
GetObjectPointCloudsRequest,
GetObjectPointCloudsResponse,
VisionServiceBase,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .vision import Vision


class VisionRPCService(VisionServiceBase, ResourceRPCServiceBase[Vision]):
"""
gRPC service for a Vision service
"""

RESOURCE_TYPE = Vision

async def GetDetectionsFromCamera(self, stream: Stream[GetDetectionsFromCameraRequest, GetDetectionsFromCameraResponse]) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await vision.get_detections_from_camera(request.camera_name, extra=extra, timeout=timeout)
response = GetDetectionsFromCameraResponse(detections=result)
await stream.send_message(response)

async def GetDetections(self, stream: Stream[GetDetectionsRequest, GetDetectionsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None

mime_type, is_lazy = CameraMimeType.from_lazy(request.mime_type)
if is_lazy or not (CameraMimeType.is_supported(mime_type)):
image = RawImage(request.image, request.mime_type)
else:
image = Image.open(BytesIO(request.image), formats=LIBRARY_SUPPORTED_FORMATS)

result = await vision.get_detections(image, extra=extra, timeout=timeout)
response = GetDetectionsResponse(detections=result)
await stream.send_message(response)

async def GetClassificationsFromCamera(
self, stream: Stream[GetClassificationsFromCameraRequest, GetClassificationsFromCameraResponse]
) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await vision.get_classifications_from_camera(request.camera_name, request.n, extra=extra, timeout=timeout)
response = GetClassificationsFromCameraResponse(classifications=result)
await stream.send_message(response)

async def GetClassifications(self, stream: Stream[GetClassificationsRequest, GetClassificationsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None

mime_type, is_lazy = CameraMimeType.from_lazy(request.mime_type)
if is_lazy or not (CameraMimeType.is_supported(mime_type)):
image = RawImage(request.image, request.mime_type)
else:
image = Image.open(BytesIO(request.image), formats=LIBRARY_SUPPORTED_FORMATS)

result = await vision.get_classifications(image, request.n, extra=extra, timeout=timeout)
response = GetClassificationsResponse(classifications=result)
await stream.send_message(response)

async def GetObjectPointClouds(self, stream: Stream[GetObjectPointCloudsRequest, GetObjectPointCloudsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
extra = struct_to_dict(request.extra)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await vision.get_object_point_clouds(request.camera_name, extra=extra, timeout=timeout)
response = GetObjectPointCloudsResponse(mime_type=CameraMimeType.PCD.value, objects=result)
await stream.send_message(response)

async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
vision = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await vision.do_command(struct_to_dict(request.command), timeout=timeout)
await stream.send_message(DoCommandResponse(result=dict_to_struct(result)))
Loading