From 0c2ee4cca785dd70ca9a629f42716393b59d5062 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Mon, 26 Mar 2018 17:56:45 -0700 Subject: [PATCH] Refactor client for multiple requests (#170) * Refactor client for multiple requests Eliminates Response thread Removes wait_for_future() Adds call_async() that returns a Future instance call() uses call_async() internally * Add spin_until_future_complete --- rclpy/rclpy/__init__.py | 20 ++++++++ rclpy/rclpy/client.py | 101 ++++++++++++++++++++++---------------- rclpy/rclpy/executors.py | 19 +++++-- rclpy/test/test_client.py | 17 +++++++ 4 files changed, 109 insertions(+), 48 deletions(-) diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index 0569e8228..0cf2d4461 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -54,3 +54,23 @@ def spin(node): executor.spin_once() finally: executor.shutdown() + + +def spin_until_future_complete(node, future): + """ + Execute work until the future is complete. + + Callbacks and other work will be executed in a SingleThreadedExecutor until future.done() + returns True or rclpy is shutdown. + + :param future: The future object to wait on. + :type future: rclpy.task.Future + """ + # imported locally to avoid loading extensions on module import + from rclpy.executors import SingleThreadedExecutor + executor = SingleThreadedExecutor() + try: + executor.add_node(node) + executor.spin_until_future_complete(future) + finally: + executor.shutdown() diff --git a/rclpy/rclpy/client.py b/rclpy/rclpy/client.py index bd1d5dbc4..9eab09f63 100644 --- a/rclpy/rclpy/client.py +++ b/rclpy/rclpy/client.py @@ -16,41 +16,10 @@ import time from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.task import Future import rclpy.utilities -class ResponseThread(threading.Thread): - - def __init__(self, client): - threading.Thread.__init__(self) - self.client = client - self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() - _rclpy.rclpy_wait_set_init(self.wait_set, 0, 1, 0, 1, 0) - _rclpy.rclpy_wait_set_clear_entities('client', self.wait_set) - _rclpy.rclpy_wait_set_add_entity( - 'client', self.wait_set, self.client.client_handle) - - def run(self): - [sigint_gc, sigint_gc_handle] = _rclpy.rclpy_get_sigint_guard_condition() - _rclpy.rclpy_wait_set_add_entity('guard_condition', self.wait_set, sigint_gc) - - _rclpy.rclpy_wait(self.wait_set, -1) - - guard_condition_ready_list = \ - _rclpy.rclpy_get_ready_entities('guard_condition', self.wait_set) - - # destroying here to make sure we dont call shutdown before cleaning up - _rclpy.rclpy_destroy_entity(sigint_gc) - if sigint_gc_handle in guard_condition_ready_list: - rclpy.utilities.shutdown() - return - seq, response = _rclpy.rclpy_take_response( - self.client.client_handle, - self.client.srv_type.Response) - if seq is not None and seq == self.client.sequence_number: - self.client.response = response - - class Client: def __init__( self, node_handle, client_handle, client_pointer, @@ -61,15 +30,68 @@ def __init__( self.srv_type = srv_type self.srv_name = srv_name self.qos_profile = qos_profile - self.sequence_number = 0 - self.response = None + # Key is a sequence number, value is an instance of a Future + self._pending_requests = {} self.callback_group = callback_group # True when the callback is ready to fire but has not been "taken" by an executor self._executor_event = False def call(self, req): - self.response = None - self.sequence_number = _rclpy.rclpy_send_request(self.client_handle, req) + """ + Make a service request and wait for the result. + + Do not call this method in a callback or a deadlock may occur. + + :param req: The service request + :return: The service response + """ + event = threading.Event() + + def unblock(future): + nonlocal event + event.set() + + future = self.call_async(req) + future.add_done_callback(unblock) + + event.wait() + if future.exception() is not None: + raise future.exception() + return future.result() + + def remove_pending_request(self, future): + """ + Remove a future from the list of pending requests. + + This prevents a future from receiving a request and executing its done callbacks. + :param future: a future returned from :meth:`call_async` + :type future: rclpy.task.Future + """ + for seq, req_future in self._pending_requests.items(): + if future == req_future: + try: + del self._pending_requests[seq] + except KeyError: + pass + break + + def call_async(self, req): + """ + Make a service request and asyncronously get the result. + + :return: a Future instance that completes when the request does + :rtype: :class:`rclpy.task.Future` instance + """ + sequence_number = _rclpy.rclpy_send_request(self.client_handle, req) + if sequence_number in self._pending_requests: + raise RuntimeError('Sequence (%r) conflicts with pending request' % sequence_number) + + future = Future() + self._pending_requests[sequence_number] = future + + future.add_done_callback(self.remove_pending_request) + + return future def service_is_ready(self): return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle) @@ -85,10 +107,3 @@ def wait_for_service(self, timeout_sec=None): timeout_sec -= sleep_time return self.service_is_ready() - - # TODO(mikaelarguedas) this function can only be used if nobody is spinning - # need to be updated once guard_conditions are supported - def wait_for_future(self): - thread1 = ResponseThread(self) - thread1.start() - thread1.join() diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 0dd46638d..a14f8f3be 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -191,6 +191,11 @@ def spin(self): while ok(): self.spin_once() + def spin_until_future_complete(self, future): + """Execute until a given future is done.""" + while ok() and not future.done(): + self.spin_once() + def spin_once(self, timeout_sec=None): """ Wait for and execute a single callback. @@ -222,11 +227,15 @@ def _take_client(self, client): async def _execute_client(self, client, seq_and_response): sequence, response = seq_and_response - if sequence is not None and sequence == client.sequence_number: - # clients spawn their own thread to wait for a response in the - # wait_for_future function. Users can either use this mechanism or monitor - # the content of client.response to check if a response has been received - client.response = response + if sequence is not None: + try: + future = client._pending_requests[sequence] + except IndexError: + # The request was cancelled + pass + else: + future._set_executor(self) + future.set_result(response) def _take_service(self, srv): request_and_header = _rclpy.rclpy_take_request( diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py index 97e88b855..c00320797 100644 --- a/rclpy/test/test_client.py +++ b/rclpy/test/test_client.py @@ -70,6 +70,23 @@ def test_wait_for_service_exists(self): self.node.destroy_client(cli) self.node.destroy_service(srv) + def test_concurrent_calls_to_service(self): + cli = self.node.create_client(GetParameters, 'get/parameters') + srv = self.node.create_service( + GetParameters, 'get/parameters', + lambda request, response: response) + try: + self.assertTrue(cli.wait_for_service(timeout_sec=20)) + future1 = cli.call_async(GetParameters.Request()) + future2 = cli.call_async(GetParameters.Request()) + rclpy.spin_until_future_complete(self.node, future1) + rclpy.spin_until_future_complete(self.node, future2) + self.assertTrue(future1.result() is not None) + self.assertTrue(future2.result() is not None) + finally: + self.node.destroy_client(cli) + self.node.destroy_service(srv) + if __name__ == '__main__': unittest.main()