From ac6362a5a37cdac288c4a2894b7f4da549770dc8 Mon Sep 17 00:00:00 2001 From: Jan Luebbe Date: Thu, 20 Nov 2025 10:55:13 +0100 Subject: [PATCH] remote/coordinator: prohibit multiple exporters with the same name Since the switch to gRPC, we only check that we don't have a connection from the same peer, but not with the same name, which would break assumptions elsewhere in the code and confuse users. Fix it by raising an exception in this case and shutting down both sides of the ExporterStream. Fixes: #1774 Signed-off-by: Jan Luebbe --- labgrid/remote/coordinator.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index 5bddb089e..a63a21482 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -401,6 +401,8 @@ async def ExporterStream(self, request_iterator, context): command_queue = asyncio.Queue() pending_commands = [] + startup_done = asyncio.Event() + out_msg = labgrid_coordinator_pb2.ExporterOutMessage() out_msg.hello.version = labgrid_version() yield out_msg @@ -420,10 +422,15 @@ async def request_task(): elif kind == "startup": version = in_msg.startup.version name = in_msg.startup.name + if existing := self.get_exporter_by_name(name): + raise ExporterError( + f"exporter with name '{name}' is already connected from {existing.peer}" + ) session = self.exporters[peer] = ExporterSession(self, peer, name, command_queue, version) logging.debug("Exporters: %s", self.exporters) logging.debug("Received startup from %s with %s", name, version) asyncio.current_task().set_name(f"exporter-{peer}-rx/started-{name}") + startup_done.set() elif kind == "resource": logging.debug("Received resource from %s with %s", name, in_msg.resource) action, _ = session.set_resource( @@ -439,10 +446,32 @@ async def request_task(): logging.debug("exporter request_task done: %s", context.done()) except Exception: logging.exception("error in exporter message handler") + raise asyncio.current_task().set_name(f"exporter-{peer}-tx") running_request_task = self.loop.create_task(request_task(), name=f"exporter-{peer}-rx/init") + startup_done_task = self.loop.create_task(startup_done.wait()) + done, _ = await asyncio.wait( + {startup_done_task, running_request_task}, + timeout=3, + return_when=asyncio.FIRST_COMPLETED, + ) + # clean up event task + startup_done.set() + await startup_done_task + if running_request_task in done: + # we probably had an exception during startup + try: + await running_request_task + except ExporterError as e: + await context.abort(grpc.StatusCode.ALREADY_EXISTS, f"startup failed: {e}") + raise + elif startup_done_task in done: + await startup_done_task + else: + raise ExporterError(f"exporter connection from {peer} timed out during startup") + try: async for cmd in queue_as_aiter(command_queue): logging.debug("exporter cmd %s", cmd)