Skip to content

Commit

Permalink
Refactor client for multiple requests (#170)
Browse files Browse the repository at this point in the history
* Refactor client for multiple requests

Eliminates Response thread
Removes wait_for_future()
Adds call_async() that returns a Future instance
call() uses call_async() internally

* Add spin_until_future_complete
  • Loading branch information
sloretz committed Mar 27, 2018
1 parent 50ccfd8 commit 0c2ee4c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 48 deletions.
20 changes: 20 additions & 0 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,23 @@ def spin(node):
executor.spin_once()
finally:
executor.shutdown()


def spin_until_future_complete(node, future):
"""
Execute work until the future is complete.
Callbacks and other work will be executed in a SingleThreadedExecutor until future.done()
returns True or rclpy is shutdown.
:param future: The future object to wait on.
:type future: rclpy.task.Future
"""
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
executor = SingleThreadedExecutor()
try:
executor.add_node(node)
executor.spin_until_future_complete(future)
finally:
executor.shutdown()
101 changes: 58 additions & 43 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,10 @@
import time

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.task import Future
import rclpy.utilities


class ResponseThread(threading.Thread):

def __init__(self, client):
threading.Thread.__init__(self)
self.client = client
self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set()
_rclpy.rclpy_wait_set_init(self.wait_set, 0, 1, 0, 1, 0)
_rclpy.rclpy_wait_set_clear_entities('client', self.wait_set)
_rclpy.rclpy_wait_set_add_entity(
'client', self.wait_set, self.client.client_handle)

def run(self):
[sigint_gc, sigint_gc_handle] = _rclpy.rclpy_get_sigint_guard_condition()
_rclpy.rclpy_wait_set_add_entity('guard_condition', self.wait_set, sigint_gc)

_rclpy.rclpy_wait(self.wait_set, -1)

guard_condition_ready_list = \
_rclpy.rclpy_get_ready_entities('guard_condition', self.wait_set)

# destroying here to make sure we dont call shutdown before cleaning up
_rclpy.rclpy_destroy_entity(sigint_gc)
if sigint_gc_handle in guard_condition_ready_list:
rclpy.utilities.shutdown()
return
seq, response = _rclpy.rclpy_take_response(
self.client.client_handle,
self.client.srv_type.Response)
if seq is not None and seq == self.client.sequence_number:
self.client.response = response


class Client:
def __init__(
self, node_handle, client_handle, client_pointer,
Expand All @@ -61,15 +30,68 @@ def __init__(
self.srv_type = srv_type
self.srv_name = srv_name
self.qos_profile = qos_profile
self.sequence_number = 0
self.response = None
# Key is a sequence number, value is an instance of a Future
self._pending_requests = {}
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False

def call(self, req):
self.response = None
self.sequence_number = _rclpy.rclpy_send_request(self.client_handle, req)
"""
Make a service request and wait for the result.
Do not call this method in a callback or a deadlock may occur.
:param req: The service request
:return: The service response
"""
event = threading.Event()

def unblock(future):
nonlocal event
event.set()

future = self.call_async(req)
future.add_done_callback(unblock)

event.wait()
if future.exception() is not None:
raise future.exception()
return future.result()

def remove_pending_request(self, future):
"""
Remove a future from the list of pending requests.
This prevents a future from receiving a request and executing its done callbacks.
:param future: a future returned from :meth:`call_async`
:type future: rclpy.task.Future
"""
for seq, req_future in self._pending_requests.items():
if future == req_future:
try:
del self._pending_requests[seq]
except KeyError:
pass
break

def call_async(self, req):
"""
Make a service request and asyncronously get the result.
:return: a Future instance that completes when the request does
:rtype: :class:`rclpy.task.Future` instance
"""
sequence_number = _rclpy.rclpy_send_request(self.client_handle, req)
if sequence_number in self._pending_requests:
raise RuntimeError('Sequence (%r) conflicts with pending request' % sequence_number)

future = Future()
self._pending_requests[sequence_number] = future

future.add_done_callback(self.remove_pending_request)

return future

def service_is_ready(self):
return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle)
Expand All @@ -85,10 +107,3 @@ def wait_for_service(self, timeout_sec=None):
timeout_sec -= sleep_time

return self.service_is_ready()

# TODO(mikaelarguedas) this function can only be used if nobody is spinning
# need to be updated once guard_conditions are supported
def wait_for_future(self):
thread1 = ResponseThread(self)
thread1.start()
thread1.join()
19 changes: 14 additions & 5 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def spin(self):
while ok():
self.spin_once()

def spin_until_future_complete(self, future):
"""Execute until a given future is done."""
while ok() and not future.done():
self.spin_once()

def spin_once(self, timeout_sec=None):
"""
Wait for and execute a single callback.
Expand Down Expand Up @@ -222,11 +227,15 @@ def _take_client(self, client):

async def _execute_client(self, client, seq_and_response):
sequence, response = seq_and_response
if sequence is not None and sequence == client.sequence_number:
# clients spawn their own thread to wait for a response in the
# wait_for_future function. Users can either use this mechanism or monitor
# the content of client.response to check if a response has been received
client.response = response
if sequence is not None:
try:
future = client._pending_requests[sequence]
except IndexError:
# The request was cancelled
pass
else:
future._set_executor(self)
future.set_result(response)

def _take_service(self, srv):
request_and_header = _rclpy.rclpy_take_request(
Expand Down
17 changes: 17 additions & 0 deletions rclpy/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ def test_wait_for_service_exists(self):
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_concurrent_calls_to_service(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(
GetParameters, 'get/parameters',
lambda request, response: response)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
future1 = cli.call_async(GetParameters.Request())
future2 = cli.call_async(GetParameters.Request())
rclpy.spin_until_future_complete(self.node, future1)
rclpy.spin_until_future_complete(self.node, future2)
self.assertTrue(future1.result() is not None)
self.assertTrue(future2.result() is not None)
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)


if __name__ == '__main__':
unittest.main()

0 comments on commit 0c2ee4c

Please sign in to comment.