Skip to content

Commit

Permalink
remote/client: explicitly cancel the streaming call when stopping the…
Browse files Browse the repository at this point in the history
… client

Otherwise messages can arrive after the loop is stopped, which triggers
exceptions.

Signed-off-by: Jan Luebbe <jlu@pengutronix.de>
  • Loading branch information
jluebbe committed Jun 18, 2024
1 parent e9b4a1d commit 5f14b2c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
37 changes: 26 additions & 11 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ def getuser(self):

def __attrs_post_init__(self):
"""Actions which are executed if a connection is successfully opened."""
self.failed = asyncio.Event()
self.stopping = asyncio.Event()

self.channel = grpc.aio.insecure_channel(self.url)
self.stub = labgrid_coordinator_pb2_grpc.CoordinatorStub(self.channel)

self.out_queue = asyncio.Queue()
self.stream_call = None
self.pump_task = None
self.sync_id = itertools.count(start=1)
self.sync_events = {}
Expand All @@ -101,9 +102,19 @@ async def start(self):
msg.subscribe.all_resources = True
self.out_queue.put_nowait(msg)
await self.sync_with_coordinator()
if self.failed.is_set():
if self.stopping.is_set():
raise ServerError("failed to connect to coordinator")

async def stop(self):
self.out_queue.put_nowait(None) # let the sender side exit gracefully
if self.stream_call:
self.stream_call.cancel()
try:
await self.pump_task
except asyncio.CancelledError:
pass
self.cancel_pending_syncs()

async def sync_with_coordinator(self):
id = next(self.sync_id)
event = self.sync_events[id] = asyncio.Event()
Expand All @@ -112,14 +123,14 @@ async def sync_with_coordinator(self):
logging.info("sending sync %s", id)
self.out_queue.put_nowait(msg)
await event.wait()
if self.failed.is_set():
if self.stopping.is_set():
logging.info("sync %s failed", id)
else:
logging.info("received sync %s", id)
return not self.failed.is_set()
return not self.stopping.is_set()

def cancel_pending_syncs(self):
assert self.failed.is_set() # only call when something has gone wrong
assert self.stopping.is_set() # only call when something has gone wrong
while True:
try:
id, event = self.sync_events.popitem()
Expand All @@ -131,7 +142,8 @@ def cancel_pending_syncs(self):
async def message_pump(self):
got_message = False
try:
async for out_msg in self.stub.ClientStream(queue_as_aiter(self.out_queue)):
self.stream_call = call = self.stub.ClientStream(queue_as_aiter(self.out_queue))
async for out_msg in call:
out_msg: labgrid_coordinator_pb2.ClientOutMessage
got_message = True
logging.debug("out_msg from coordinator: %s", out_msg)
Expand Down Expand Up @@ -165,9 +177,6 @@ async def message_pump(self):
event = self.sync_events.pop(out_msg.sync.id)
event.set()
except grpc.aio.AioRpcError as e:
self.failed.set()
self.out_queue.put_nowait(None) # let the sender side exit gracefully
self.cancel_pending_syncs()
if e.code() == grpc.StatusCode.UNAVAILABLE:
if got_message:
logging.error("coordinator became unavailable: %s", e.details())
Expand All @@ -176,8 +185,11 @@ async def message_pump(self):
else:
logging.exception("unexpected grpc error in coordinator message pump task")
except Exception:
self.failed.set()
logging.exception("error in coordinator message pump task")
finally:
self.stopping.set()
self.out_queue.put_nowait(None) # let the sender side exit gracefully
self.cancel_pending_syncs()

async def on_resource_changed(self, exporter, group_name, resource_name, resource):
group = self.resources.setdefault(exporter,
Expand Down Expand Up @@ -224,7 +236,7 @@ async def on_place_deleted(self, name: str):

async def do_monitor(self):
self.monitor = True
await self.failed.wait()
await self.stopping.wait()

async def complete(self):
if self.args.type == 'resources':
Expand Down Expand Up @@ -2030,6 +2042,9 @@ def main():
else:
args.func(session)
finally:
logging.debug('Stopping session')
session.loop.run_until_complete(session.stop())
logging.debug('Stopping loop')
session.loop.close()
except (NoResourceFoundError, NoDriverFoundError, InvalidConfigError) as e:
if args.debug:
Expand Down
7 changes: 6 additions & 1 deletion labgrid/remote/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import socket
import time
import enum
Expand Down Expand Up @@ -476,7 +477,11 @@ def from_pb2(cls, pb2: labgrid_coordinator_pb2.Reservation):
async def queue_as_aiter(q):
try:
while True:
item = await q.get()
try:
item = await q.get()
except asyncio.CancelledError:
# gRPC doesn't like to receive exceptions from the request_iterator
return
if item is None:
return
yield item
Expand Down

0 comments on commit 5f14b2c

Please sign in to comment.