diff --git a/src/viam/components/arm/service.py b/src/viam/components/arm/service.py index 77661b8ce..56e903daf 100644 --- a/src/viam/components/arm/service.py +++ b/src/viam/components/arm/service.py @@ -109,7 +109,7 @@ async def GetKinematics(self, stream: Stream[GetKinematicsRequest, GetKinematics assert request is not None arm = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None - format, kinematics_data = await arm.get_kinematics(extra=struct_to_dict(request.extra), timeout=timeout) + format, kinematics_data = await arm.get_kinematics(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) response = GetKinematicsResponse(format=format, kinematics_data=kinematics_data) await stream.send_message(response) diff --git a/src/viam/components/gantry/client.py b/src/viam/components/gantry/client.py index 525a5ae05..1d300bbf9 100644 --- a/src/viam/components/gantry/client.py +++ b/src/viam/components/gantry/client.py @@ -1,8 +1,15 @@ -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, List, Mapping, Optional, Tuple from grpclib.client import Channel -from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + Geometry, + GetKinematicsRequest, + GetKinematicsResponse, + KinematicsFileFormat, +) from viam.proto.component.gantry import ( GantryServiceStub, GetLengthsRequest, @@ -110,6 +117,14 @@ async def do_command( response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result) + async def get_kinematics( + self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs + ) -> Tuple[KinematicsFileFormat.ValueType, bytes]: + md = kwargs.get("metadata", self.Metadata()).proto + request = GetKinematicsRequest(name=self.name, extra=dict_to_struct(extra)) + response: GetKinematicsResponse = await self.client.GetKinematics(request, timeout=timeout, metadata=md) + return (response.format, response.kinematics_data) + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: md = kwargs.get("metadata", self.Metadata()) return await get_geometries(self.client, self.name, extra, timeout, md) diff --git a/src/viam/components/gantry/gantry.py b/src/viam/components/gantry/gantry.py index 865b09e91..6d552fbe5 100644 --- a/src/viam/components/gantry/gantry.py +++ b/src/viam/components/gantry/gantry.py @@ -1,6 +1,7 @@ import abc -from typing import Any, Dict, Final, List, Optional +from typing import Any, Dict, Final, List, Optional, Tuple +from viam.components.arm import KinematicsFileFormat from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT from ..component_base import ComponentBase @@ -154,3 +155,33 @@ async def is_moving(self) -> bool: For more information, see `Gantry component `_. """ ... + + @abc.abstractmethod + async def get_kinematics( + self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs + ) -> Tuple[KinematicsFileFormat.ValueType, bytes]: + """ + Get the kinematics information associated with the gantry. + + :: + + my_gantry = Gantry.from_robot(robot=machine, name="my_gantry") + + # Get the kinematics information associated with the gantry. + kinematics = await my_gantry.get_kinematics() + + # Get the format of the kinematics file. + k_file = kinematics[0] + + # Get the byte contents of the file. + k_bytes = kinematics[1] + + Returns: + Tuple[KinematicsFileFormat.ValueType, bytes]: A tuple containing two values; the first [0] value represents the format of the + file, either in URDF format (``KinematicsFileFormat.KINEMATICS_FILE_FORMAT_URDF``) or + Viam's kinematic parameter format (spatial vector algebra) (``KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA``), + and the second [1] value represents the byte contents of the file. + + For more information, see `Arm component `_. + """ + ... diff --git a/src/viam/components/gantry/service.py b/src/viam/components/gantry/service.py index 8828f3496..7036a0517 100644 --- a/src/viam/components/gantry/service.py +++ b/src/viam/components/gantry/service.py @@ -1,8 +1,14 @@ from grpclib.server import Stream -from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetGeometriesRequest, + GetGeometriesResponse, + GetKinematicsRequest, + GetKinematicsResponse, +) from viam.proto.component.gantry import ( - GantryServiceBase, GetLengthsRequest, GetLengthsResponse, GetPositionRequest, @@ -15,6 +21,7 @@ MoveToPositionResponse, StopRequest, StopResponse, + UnimplementedGantryServiceBase, ) from viam.resource.rpc_service_base import ResourceRPCServiceBase from viam.utils import dict_to_struct, struct_to_dict @@ -22,7 +29,7 @@ from .gantry import Gantry -class GantryRPCService(GantryServiceBase, ResourceRPCServiceBase[Gantry]): +class GantryRPCService(UnimplementedGantryServiceBase, ResourceRPCServiceBase[Gantry]): """ gRPC Service for a Gantry """ @@ -103,11 +110,20 @@ async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) - response = DoCommandResponse(result=dict_to_struct(result)) await stream.send_message(response) + async def GetKinematics(self, stream: Stream[GetKinematicsRequest, GetKinematicsResponse]) -> None: + request = await stream.recv_message() + assert request is not None + gantry = self.get_resource(request.name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + format, data = await gantry.get_kinematics(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) + response = GetKinematicsResponse(format=format, kinematics_data=data) + await stream.send_message(response) + async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None: request = await stream.recv_message() assert request is not None - arm = self.get_resource(request.name) + gantry = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None - geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout) + geometries = await gantry.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout) response = GetGeometriesResponse(geometries=geometries) await stream.send_message(response) diff --git a/tests/mocks/components.py b/tests/mocks/components.py index 1fbae9b16..6a06d661b 100644 --- a/tests/mocks/components.py +++ b/tests/mocks/components.py @@ -556,6 +556,7 @@ def __init__(self, name: str, position: List[float], lengths: List[float]): self.position = position self.lengths = lengths self.is_stopped = True + self.kinematics = (KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA, b"\x00\x01\x02") self.extra = None self.homed = True self.speeds = Optional[List[float]] @@ -602,6 +603,13 @@ async def stop(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optiona async def is_moving(self) -> bool: return not self.is_stopped + async def get_kinematics( + self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs + ) -> Tuple[KinematicsFileFormat.ValueType, bytes]: + self.extra = extra + self.timeout = timeout + return self.kinematics + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]: self.extra = extra self.timeout = timeout diff --git a/tests/test_gantry.py b/tests/test_gantry.py index 249f28cd8..728f1926a 100644 --- a/tests/test_gantry.py +++ b/tests/test_gantry.py @@ -2,7 +2,14 @@ from viam.components.gantry import GantryClient from viam.components.gantry.service import GantryRPCService -from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetGeometriesRequest, + GetGeometriesResponse, + GetKinematicsRequest, + GetKinematicsResponse, +) from viam.proto.component.gantry import ( GantryServiceStub, GetLengthsRequest, @@ -64,6 +71,11 @@ async def test_extra(self): await self.gantry.move_to_position([1, 2, 3], [4, 5, 6], extra=extra) assert self.gantry.extra == extra + async def test_get_kinematics(self): + format, data = await self.gantry.get_kinematics() + assert format == self.gantry.kinematics[0] + assert data == self.gantry.kinematics[1] + async def test_timeout(self): assert self.gantry.timeout is None @@ -160,6 +172,15 @@ async def test_do(self): result = struct_to_dict(response.result) assert result == {"command": command} + async def test_get_kinematics(self): + async with ChannelFor([self.service]) as channel: + client = GantryServiceStub(channel) + request = GetKinematicsRequest(name=self.gantry.name) + response: GetKinematicsResponse = await client.GetKinematics(request, timeout=1.1) + assert response.format == self.gantry.kinematics[0] + assert response.kinematics_data == self.gantry.kinematics[1] + assert self.gantry.timeout == loose_approx(1.1) + async def test_get_geometries(self): async with ChannelFor([self.service]) as channel: client = GantryServiceStub(channel) @@ -234,6 +255,14 @@ async def test_extra(self): await client.move_to_position([1, 2, 3], [4, 5, 6], extra=extra) assert self.gantry.extra == extra + async def test_get_kinematics(self): + async with ChannelFor([self.service]) as channel: + client = GantryClient(self.gantry.name, channel) + format, data = await client.get_kinematics(timeout=1.1) + assert format == self.gantry.kinematics[0] + assert data == self.gantry.kinematics[1] + assert self.gantry.timeout == loose_approx(1.1) + async def test_get_geometries(self): async with ChannelFor([self.service]) as channel: client = GantryClient(self.gantry.name, channel)