Skip to content

Commit

Permalink
Fix an inherent race in execution vs. destruction. (#1150) (#1256)
Browse files Browse the repository at this point in the history
* Fix an inherent race in execution vs. destruction.

The rclpy executor collects all of the entities in one
pass, then creates async tasks for each of the ready ones
and attempts to "take" and execute them.  If one of those entities
is destroyed after the collection but before we attempt
to "take" it, then we can end up attempting to __enter__
a Destroyable-derived class that has already been destroyed.
The Destroyable will then raise an InvalidHandle error.

Fix this by explicitly catching the InvalidHandle error
that can be raised in all of the Destroyable-derived entities.
If we do catch it, then we actually let the machinery
continue but tell things to just not execute; in a subsequent
executor iteration, the entity will be destroyed and
hence not looked at anymore.  This seems to fix the race
in my testing.

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
Co-authored-by: Chris Lalancette <clalancette@gmail.com>
  • Loading branch information
Timple and clalancette committed Mar 20, 2024
1 parent 06d78fb commit a19180c
Showing 1 changed file with 102 additions and 63 deletions.
165 changes: 102 additions & 63 deletions rclpy/rclpy/executors.py
Expand Up @@ -345,83 +345,126 @@ def spin_once_until_future_complete(
raise NotImplementedError()

def _take_timer(self, tmr):
with tmr.handle:
tmr.handle.call_timer()
return ()
try:
with tmr.handle:
tmr.handle.call_timer()

async def _execute():
await await_or_execute(tmr.callback)
return _execute
except InvalidHandle:
# Timer is a Destroyable, which means that on __enter__ it can throw an
# InvalidHandle exception if the entity has already been destroyed. Handle that here
# by just returning an empty argument, which means we will skip doing any real work
# in _execute_timer below
pass

async def _execute_timer(self, tmr):
await await_or_execute(tmr.callback)
return None

def _take_subscription(self, sub):
with sub.handle:
msg_info = sub.handle.take_message(sub.msg_type, sub.raw)
if msg_info is not None:
try:
with sub.handle:
msg_info = sub.handle.take_message(sub.msg_type, sub.raw)
if msg_info is None:
return None

if sub._callback_type is Subscription.CallbackType.MessageOnly:
return (msg_info[0], )
msg_tuple = (msg_info[0], )
else:
return msg_info
return ()
msg_tuple = msg_info

async def _execute():
await await_or_execute(sub.callback, *msg_tuple)

return _execute
except InvalidHandle:
# Subscription is a Destroyable, which means that on __enter__ it can throw an
# InvalidHandle exception if the entity has already been destroyed. Handle that here
# by just returning an empty argument, which means we will skip doing any real work
# in _execute_subscription below
pass

async def _execute_subscription(self, sub, *args):
if args:
await await_or_execute(sub.callback, *args)
return None

def _take_client(self, client):
with client.handle:
return (client.handle.take_response(client.srv_type.Response), )
try:
with client.handle:
header_and_response = client.handle.take_response(client.srv_type.Response)

async def _execute_client(self, client, seq_and_response):
header, response = seq_and_response
if header is not None:
try:
sequence = header.request_id.sequence_number
future = client.get_pending_request(sequence)
except KeyError:
# The request was cancelled
pass
else:
future._set_executor(self)
future.set_result(response)
async def _execute():
header, response = header_and_response
if header is None:
return
try:
sequence = header.request_id.sequence_number
future = client.get_pending_request(sequence)
except KeyError:
# The request was cancelled
pass
else:
future._set_executor(self)
future.set_result(response)
return _execute

except InvalidHandle:
# Client is a Destroyable, which means that on __enter__ it can throw an
# InvalidHandle exception if the entity has already been destroyed. Handle that here
# by just returning an empty argument, which means we will skip doing any real work
# in _execute_client below
pass

return None

def _take_service(self, srv):
with srv.handle:
request_and_header = srv.handle.service_take_request(srv.srv_type.Request)
return (request_and_header, )

async def _execute_service(self, srv, request_and_header):
if request_and_header is None:
return
(request, header) = request_and_header
if request:
response = await await_or_execute(srv.callback, request, srv.srv_type.Response())
srv.send_response(response, header)
try:
with srv.handle:
request_and_header = srv.handle.service_take_request(srv.srv_type.Request)

async def _execute():
(request, header) = request_and_header
if header is None:
return

response = await await_or_execute(srv.callback, request, srv.srv_type.Response())
srv.send_response(response, header)
return _execute
except InvalidHandle:
# Service is a Destroyable, which means that on __enter__ it can throw an
# InvalidHandle exception if the entity has already been destroyed. Handle that here
# by just returning an empty argument, which means we will skip doing any real work
# in _execute_service below
pass

return None

def _take_guard_condition(self, gc):
gc._executor_triggered = False
return ()

async def _execute_guard_condition(self, gc):
await await_or_execute(gc.callback)
async def _execute():
await await_or_execute(gc.callback)
return _execute

def _take_waitable(self, waitable):
data = waitable.take_data()

async def _execute_waitable(self, waitable, data):
for future in waitable._futures:
future._set_executor(self)
await waitable.execute(data)
async def _execute():
for future in waitable._futures:
future._set_executor(self)
await waitable.execute(data)
return _execute

def _make_handler(
self,
entity: WaitableEntityType,
node: 'Node',
take_from_wait_list: Callable,
call_coroutine: Coroutine
) -> Task:
"""
Make a handler that performs work on an entity.
:param entity: An entity to wait on.
:param node: The node associated with the entity.
:param take_from_wait_list: Makes the entity to stop appearing in the wait list.
:param call_coroutine: Does the work the entity is ready for
"""
# Mark this so it doesn't get added back to the wait list
entity._executor_event = True
Expand All @@ -433,14 +476,17 @@ async def handler(entity, gc, is_shutdown, work_tracker):
gc.trigger()
return
with work_tracker:
arg = take_from_wait_list(entity)
# The take_from_wait_list method here is expected to return either an async def
# method or None if there is no work to do.
call_coroutine = take_from_wait_list(entity)

# Signal that this has been 'taken' and can be added back to the wait list
entity._executor_event = False
gc.trigger()

try:
await call_coroutine(entity, *arg)
if call_coroutine is not None:
await call_coroutine()
finally:
entity.callback_group.ending_execution(entity)
# Signal that work has been done so the next callback in a mutually exclusive
Expand Down Expand Up @@ -641,8 +687,7 @@ def _wait_for_ready_callbacks(
# Only check waitables that were added to the wait set
if wt in waitables and wt.is_ready(wait_set):
if wt.callback_group.can_execute(wt):
handler = self._make_handler(
wt, node, lambda e: (e.take_data(), ), self._execute_waitable)
handler = self._make_handler(wt, node, self._take_waitable)
yielded_work = True
yield handler, wt, node

Expand All @@ -653,41 +698,35 @@ def _wait_for_ready_callbacks(
# Check timer is ready to workaround rcl issue with cancelled timers
if tmr.handle.is_timer_ready():
if tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, node, self._take_timer, self._execute_timer)
handler = self._make_handler(tmr, node, self._take_timer)
yielded_work = True
yield handler, tmr, node

for sub in node.subscriptions:
if sub.handle.pointer in subs_ready:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, node, self._take_subscription, self._execute_subscription)
handler = self._make_handler(sub, node, self._take_subscription)
yielded_work = True
yield handler, sub, node

for gc in node.guards:
if gc._executor_triggered:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, node, self._take_guard_condition,
self._execute_guard_condition)
handler = self._make_handler(gc, node, self._take_guard_condition)
yielded_work = True
yield handler, gc, node

for client in node.clients:
if client.handle.pointer in clients_ready:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, node, self._take_client, self._execute_client)
handler = self._make_handler(client, node, self._take_client)
yielded_work = True
yield handler, client, node

for srv in node.services:
if srv.handle.pointer in services_ready:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, node, self._take_service, self._execute_service)
handler = self._make_handler(srv, node, self._take_service)
yielded_work = True
yield handler, srv, node

Expand Down

0 comments on commit a19180c

Please sign in to comment.