Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/async_node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Async Node samples

The following samples are based off the ROS2 documented [tutorials](https://docs.ros.org/en/humble/Tutorials.html).

## Usage

*All scripts assume terminals are open in the root of this repository and have installed `rclpy_async`*

A simple publisher and subscriber:
```sh
# Terminal 1:
python3 -m examples.async_node.subscriber

# Terminal 2:
python3 -m examples.async_node.publisher
```

A simple service and client:
```sh
# Terminal 1:
python3 -m examples.async_node.service

# Terminal 2:
python3 -m examples.async_node.client 1 2
```

Using parameters in a class:
```sh
# Start the node
python3 -m examples.async_node.param
# [Optionally] start the node with the parameter as something other that "world" by adding:
python3 -m examples.async_node.param --ros-args -p my_parameter:=world2

# In another terminal set the parameter again
ros2 param set /minimal_param_node my_parameter earth
```

An action server and client:
```sh
# Terminal 1:
python3 -m examples.async_node.action_server

# Terminal 2:
python3 -m examples.async_node.action_client
```
31 changes: 31 additions & 0 deletions examples/async_node/action_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Callable

import anyio
import rclpy
from example_interfaces.action import Fibonacci

import rclpy_async
from rclpy_async import AsyncNode

node = AsyncNode("fibonacci_action_client")


async def main():
rclpy.init()
node.initialize()

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)

with rclpy_async.action_client(node, Fibonacci, "fibonacci") as action_client:
print_feedback: Callable[[Fibonacci.Impl.FeedbackMessage], None] = (
lambda msg: node.get_logger().info(
f"Fibonacci feedback: {msg.feedback}"
)
)
result = await action_client(Fibonacci.Goal(order=10), print_feedback)
node.get_logger().info(f"Fibonacci result: {result}")


if __name__ == "__main__":
anyio.run(main)
44 changes: 44 additions & 0 deletions examples/async_node/action_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import anyio
import rclpy
from example_interfaces.action import Fibonacci
from rclpy.action.server import ServerGoalHandle

import rclpy_async
from rclpy_async import AsyncNode, async_run

node = AsyncNode("fibonacci_action_server_node")


@node.action(Fibonacci, "fibonacci")
async def execute_callback(goal_handle: ServerGoalHandle):
node.get_logger().info("Executing goal...")

feedback_msg = Fibonacci.Feedback()
feedback_msg.sequence = [0, 1]

for i in range(1, goal_handle.request.order):
feedback_msg.sequence.append(
feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]
)
node.get_logger().info("Feedback: {0}".format(feedback_msg.sequence))
goal_handle.publish_feedback(feedback_msg)
await anyio.sleep(1)

goal_handle.succeed()

result = Fibonacci.Result()
result.sequence = feedback_msg.sequence
return result


async def main():
rclpy.init()
node.initialize()

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)
await async_run(node)


if __name__ == "__main__":
anyio.run(main)
35 changes: 35 additions & 0 deletions examples/async_node/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sys

import anyio
import rclpy
from example_interfaces.srv import AddTwoInts

import rclpy_async
from rclpy_async import AsyncNode

node = AsyncNode("minimal_client_async")


async def main():
rclpy.init()
node.initialize()

async with rclpy_async.start_executor() as executor:
executor.add_node(node)
with rclpy_async.service_client(
node,
AddTwoInts,
"add_two_ints",
) as cli:
req = AddTwoInts.Request()
req.a = int(sys.argv[1])
req.b = int(sys.argv[2])
response = await cli(req)
node.get_logger().info(
"Result of add_two_ints: for %d + %d = %d"
% (int(sys.argv[1]), int(sys.argv[2]), response.sum)
)


if __name__ == "__main__":
anyio.run(main)
43 changes: 43 additions & 0 deletions examples/async_node/param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import anyio
import rclpy
from rclpy.parameter import Parameter

import rclpy_async
from rclpy_async import AsyncNode, async_run

node = AsyncNode("minimal_param_node")


@node.timer(
lambda: node.get_parameter("timer_period").get_parameter_value().double_value
)
async def timer_callback():
node.get_logger().info(
f"Hello {node.get_parameter('my_parameter').get_parameter_value().string_value}!"
)
node.set_parameters(
[
Parameter("my_parameter", Parameter.Type.STRING, "world"),
Parameter(
"timer_period",
Parameter.Type.DOUBLE,
node.get_parameter("timer_period").get_parameter_value().double_value,
),
]
)


async def main():
rclpy.init()
node.initialize()
node.declare_parameters(
namespace="", parameters=[("my_parameter", "world"), ("timer_period", 2.0)]
)

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)
await async_run(node)


if __name__ == "__main__":
anyio.run(main)
31 changes: 31 additions & 0 deletions examples/async_node/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import anyio
import rclpy
import rclpy_async
from rclpy_async import AsyncNode, async_run
from std_msgs.msg import String

node = AsyncNode("minimal_publisher")


@node.timer(0.5)
async def timer_callback():
msg = String(data="Hello World: %d" % node.state.i)
node.state.publisher_.publish(msg)
node.get_logger().info('Publishing: "%s"' % msg.data)
node.state.i += 1


async def main():
rclpy.init()
node.initialize()

node.state.i = 0
node.state.publisher_ = node.create_publisher(String, "topic", 10)

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)
await async_run(node)


if __name__ == "__main__":
anyio.run(main)
29 changes: 29 additions & 0 deletions examples/async_node/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import anyio
import rclpy
from example_interfaces.srv import AddTwoInts

import rclpy_async
from rclpy_async import AsyncNode, async_run

node = AsyncNode("minimal_service")


@node.service(AddTwoInts, "add_two_ints")
async def add_two_ints_callback(request, response):
response.sum = request.a + request.b
node.get_logger().info("Incoming request \ta: %d b: %d" % (request.a, request.b))

return response


async def main():
rclpy.init()
node.initialize()

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)
await async_run(node)


if __name__ == "__main__":
anyio.run(main)
30 changes: 30 additions & 0 deletions examples/async_node/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import anyio
import rclpy
from std_msgs.msg import String

import rclpy_async
from rclpy_async import AsyncNode, BackpressureHandlerSpec

node = AsyncNode("minimal_subscriber")


@node.subscription(
String,
"topic",
backpressure_handler=BackpressureHandlerSpec(max_queue_size=5, drop_oldest=True),
)
async def listener_callback(msg: String):
node.get_logger().info('I heard: "%s"' % msg.data)


async def main():
rclpy.init()
node.initialize()

async with rclpy_async.start_executor() as xtor:
xtor.add_node(node)
await rclpy_async.async_run(node)


if __name__ == "__main__":
anyio.run(main)
6 changes: 6 additions & 0 deletions src/rclpy_async/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from importlib.metadata import version

from rclpy_async._async_node import BackpressureHandlerSpec
from rclpy_async.action_client import action_client
from rclpy_async.action_server import action_server
from rclpy_async.async_executor import start_executor
from rclpy_async.async_node import AsyncNode
from rclpy_async.async_node import run as async_run
from rclpy_async.service_client import service_client
from rclpy_async.utilities import (
future_result,
Expand All @@ -21,4 +24,7 @@
"service_client",
"action_client",
"action_server",
"async_run",
"AsyncNode",
"BackpressureHandlerSpec",
]
17 changes: 17 additions & 0 deletions src/rclpy_async/_async_node/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .action_handler_spec import ActionHandlerSpec
from .backpressure_handler_spec import BackpressureHandlerSpec
from .node_proto import NodeProto
from .service_handler_spec import ServiceHandlerSpec
from .state import State
from .timer_handler_spec import TimerHandlerSpec
from .topic_handler_spec import TopicHandlerSpec

__all__ = [
"ActionHandlerSpec",
"BackpressureHandlerSpec",
"NodeProto",
"ServiceHandlerSpec",
"State",
"TimerHandlerSpec",
"TopicHandlerSpec",
]
23 changes: 23 additions & 0 deletions src/rclpy_async/_async_node/_base_handler_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import ABC
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Generic, Optional, TypeVar, ParamSpec

from .backpressure_handler_spec import BackpressureHandlerSpec

PInput = ParamSpec("PInput")
TOutput = TypeVar("TOutput")


@dataclass
class BaseHandlerSpec(ABC, Generic[PInput, TOutput]):
"""Base Handler spec for registering ROS callbacks

Args:
ABC (_type_): Abstract Base Class
Generic (_type_): Generic class
"""

backpressure_handler: Optional[BackpressureHandlerSpec]
kwargs: dict[str, Any]

async_fn: Callable[PInput, Awaitable[TOutput]]
20 changes: 20 additions & 0 deletions src/rclpy_async/_async_node/action_handler_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass
from typing import Any

from rclpy.action.server import ServerGoalHandle

from ._base_handler_spec import BaseHandlerSpec


@dataclass
class ActionHandlerSpec(BaseHandlerSpec[[ServerGoalHandle], Any]):
"""Specification for a ROS action handler

async_fn signature: (goal: ServerGoalHandle) -> Awaitable[Any]

Args:
BaseHandlerSpec (_type_): Inherits from BaseHandlerSpec
"""

action_type: type
action_name: str
9 changes: 9 additions & 0 deletions src/rclpy_async/_async_node/backpressure_handler_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass


@dataclass
class BackpressureHandlerSpec:
"""Specification for a backpressure handler"""

max_queue_size: int = 0
drop_oldest: bool = True
Loading