From a19180c238d4d97ed2b58868d8fb7fa3e3b621f2 Mon Sep 17 00:00:00 2001 From: Tim Clephas Date: Wed, 20 Mar 2024 12:58:07 +0100 Subject: [PATCH] Fix an inherent race in execution vs. destruction. (#1150) (#1256) * Fix an inherent race in execution vs. destruction. The rclpy executor collects all of the entities in one pass, then creates async tasks for each of the ready ones and attempts to "take" and execute them. If one of those entities is destroyed after the collection but before we attempt to "take" it, then we can end up attempting to __enter__ a Destroyable-derived class that has already been destroyed. The Destroyable will then raise an InvalidHandle error. Fix this by explicitly catching the InvalidHandle error that can be raised in all of the Destroyable-derived entities. If we do catch it, then we actually let the machinery continue but tell things to just not execute; in a subsequent executor iteration, the entity will be destroyed and hence not looked at anymore. This seems to fix the race in my testing. Signed-off-by: Chris Lalancette Co-authored-by: Chris Lalancette --- rclpy/rclpy/executors.py | 165 ++++++++++++++++++++++++--------------- 1 file changed, 102 insertions(+), 63 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index e63cc34be..4cf1eb7e7 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -345,75 +345,119 @@ def spin_once_until_future_complete( raise NotImplementedError() def _take_timer(self, tmr): - with tmr.handle: - tmr.handle.call_timer() - return () + try: + with tmr.handle: + tmr.handle.call_timer() + + async def _execute(): + await await_or_execute(tmr.callback) + return _execute + except InvalidHandle: + # Timer is a Destroyable, which means that on __enter__ it can throw an + # InvalidHandle exception if the entity has already been destroyed. Handle that here + # by just returning an empty argument, which means we will skip doing any real work + # in _execute_timer below + pass - async def _execute_timer(self, tmr): - await await_or_execute(tmr.callback) + return None def _take_subscription(self, sub): - with sub.handle: - msg_info = sub.handle.take_message(sub.msg_type, sub.raw) - if msg_info is not None: + try: + with sub.handle: + msg_info = sub.handle.take_message(sub.msg_type, sub.raw) + if msg_info is None: + return None + if sub._callback_type is Subscription.CallbackType.MessageOnly: - return (msg_info[0], ) + msg_tuple = (msg_info[0], ) else: - return msg_info - return () + msg_tuple = msg_info + + async def _execute(): + await await_or_execute(sub.callback, *msg_tuple) + + return _execute + except InvalidHandle: + # Subscription is a Destroyable, which means that on __enter__ it can throw an + # InvalidHandle exception if the entity has already been destroyed. Handle that here + # by just returning an empty argument, which means we will skip doing any real work + # in _execute_subscription below + pass - async def _execute_subscription(self, sub, *args): - if args: - await await_or_execute(sub.callback, *args) + return None def _take_client(self, client): - with client.handle: - return (client.handle.take_response(client.srv_type.Response), ) + try: + with client.handle: + header_and_response = client.handle.take_response(client.srv_type.Response) - async def _execute_client(self, client, seq_and_response): - header, response = seq_and_response - if header is not None: - try: - sequence = header.request_id.sequence_number - future = client.get_pending_request(sequence) - except KeyError: - # The request was cancelled - pass - else: - future._set_executor(self) - future.set_result(response) + async def _execute(): + header, response = header_and_response + if header is None: + return + try: + sequence = header.request_id.sequence_number + future = client.get_pending_request(sequence) + except KeyError: + # The request was cancelled + pass + else: + future._set_executor(self) + future.set_result(response) + return _execute + + except InvalidHandle: + # Client is a Destroyable, which means that on __enter__ it can throw an + # InvalidHandle exception if the entity has already been destroyed. Handle that here + # by just returning an empty argument, which means we will skip doing any real work + # in _execute_client below + pass + + return None def _take_service(self, srv): - with srv.handle: - request_and_header = srv.handle.service_take_request(srv.srv_type.Request) - return (request_and_header, ) - - async def _execute_service(self, srv, request_and_header): - if request_and_header is None: - return - (request, header) = request_and_header - if request: - response = await await_or_execute(srv.callback, request, srv.srv_type.Response()) - srv.send_response(response, header) + try: + with srv.handle: + request_and_header = srv.handle.service_take_request(srv.srv_type.Request) + + async def _execute(): + (request, header) = request_and_header + if header is None: + return + + response = await await_or_execute(srv.callback, request, srv.srv_type.Response()) + srv.send_response(response, header) + return _execute + except InvalidHandle: + # Service is a Destroyable, which means that on __enter__ it can throw an + # InvalidHandle exception if the entity has already been destroyed. Handle that here + # by just returning an empty argument, which means we will skip doing any real work + # in _execute_service below + pass + + return None def _take_guard_condition(self, gc): gc._executor_triggered = False - return () - async def _execute_guard_condition(self, gc): - await await_or_execute(gc.callback) + async def _execute(): + await await_or_execute(gc.callback) + return _execute + + def _take_waitable(self, waitable): + data = waitable.take_data() - async def _execute_waitable(self, waitable, data): - for future in waitable._futures: - future._set_executor(self) - await waitable.execute(data) + async def _execute(): + for future in waitable._futures: + future._set_executor(self) + await waitable.execute(data) + return _execute def _make_handler( self, entity: WaitableEntityType, node: 'Node', take_from_wait_list: Callable, - call_coroutine: Coroutine ) -> Task: """ Make a handler that performs work on an entity. @@ -421,7 +465,6 @@ def _make_handler( :param entity: An entity to wait on. :param node: The node associated with the entity. :param take_from_wait_list: Makes the entity to stop appearing in the wait list. - :param call_coroutine: Does the work the entity is ready for """ # Mark this so it doesn't get added back to the wait list entity._executor_event = True @@ -433,14 +476,17 @@ async def handler(entity, gc, is_shutdown, work_tracker): gc.trigger() return with work_tracker: - arg = take_from_wait_list(entity) + # The take_from_wait_list method here is expected to return either an async def + # method or None if there is no work to do. + call_coroutine = take_from_wait_list(entity) # Signal that this has been 'taken' and can be added back to the wait list entity._executor_event = False gc.trigger() try: - await call_coroutine(entity, *arg) + if call_coroutine is not None: + await call_coroutine() finally: entity.callback_group.ending_execution(entity) # Signal that work has been done so the next callback in a mutually exclusive @@ -641,8 +687,7 @@ def _wait_for_ready_callbacks( # Only check waitables that were added to the wait set if wt in waitables and wt.is_ready(wait_set): if wt.callback_group.can_execute(wt): - handler = self._make_handler( - wt, node, lambda e: (e.take_data(), ), self._execute_waitable) + handler = self._make_handler(wt, node, self._take_waitable) yielded_work = True yield handler, wt, node @@ -653,41 +698,35 @@ def _wait_for_ready_callbacks( # Check timer is ready to workaround rcl issue with cancelled timers if tmr.handle.is_timer_ready(): if tmr.callback_group.can_execute(tmr): - handler = self._make_handler( - tmr, node, self._take_timer, self._execute_timer) + handler = self._make_handler(tmr, node, self._take_timer) yielded_work = True yield handler, tmr, node for sub in node.subscriptions: if sub.handle.pointer in subs_ready: if sub.callback_group.can_execute(sub): - handler = self._make_handler( - sub, node, self._take_subscription, self._execute_subscription) + handler = self._make_handler(sub, node, self._take_subscription) yielded_work = True yield handler, sub, node for gc in node.guards: if gc._executor_triggered: if gc.callback_group.can_execute(gc): - handler = self._make_handler( - gc, node, self._take_guard_condition, - self._execute_guard_condition) + handler = self._make_handler(gc, node, self._take_guard_condition) yielded_work = True yield handler, gc, node for client in node.clients: if client.handle.pointer in clients_ready: if client.callback_group.can_execute(client): - handler = self._make_handler( - client, node, self._take_client, self._execute_client) + handler = self._make_handler(client, node, self._take_client) yielded_work = True yield handler, client, node for srv in node.services: if srv.handle.pointer in services_ready: if srv.callback_group.can_execute(srv): - handler = self._make_handler( - srv, node, self._take_service, self._execute_service) + handler = self._make_handler(srv, node, self._take_service) yielded_work = True yield handler, srv, node