diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index 5bddb089e..a63a21482 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -401,6 +401,8 @@ async def ExporterStream(self, request_iterator, context): command_queue = asyncio.Queue() pending_commands = [] + startup_done = asyncio.Event() + out_msg = labgrid_coordinator_pb2.ExporterOutMessage() out_msg.hello.version = labgrid_version() yield out_msg @@ -420,10 +422,15 @@ async def request_task(): elif kind == "startup": version = in_msg.startup.version name = in_msg.startup.name + if existing := self.get_exporter_by_name(name): + raise ExporterError( + f"exporter with name '{name}' is already connected from {existing.peer}" + ) session = self.exporters[peer] = ExporterSession(self, peer, name, command_queue, version) logging.debug("Exporters: %s", self.exporters) logging.debug("Received startup from %s with %s", name, version) asyncio.current_task().set_name(f"exporter-{peer}-rx/started-{name}") + startup_done.set() elif kind == "resource": logging.debug("Received resource from %s with %s", name, in_msg.resource) action, _ = session.set_resource( @@ -439,10 +446,32 @@ async def request_task(): logging.debug("exporter request_task done: %s", context.done()) except Exception: logging.exception("error in exporter message handler") + raise asyncio.current_task().set_name(f"exporter-{peer}-tx") running_request_task = self.loop.create_task(request_task(), name=f"exporter-{peer}-rx/init") + startup_done_task = self.loop.create_task(startup_done.wait()) + done, _ = await asyncio.wait( + {startup_done_task, running_request_task}, + timeout=3, + return_when=asyncio.FIRST_COMPLETED, + ) + # clean up event task + startup_done.set() + await startup_done_task + if running_request_task in done: + # we probably had an exception during startup + try: + await running_request_task + except ExporterError as e: + await context.abort(grpc.StatusCode.ALREADY_EXISTS, f"startup failed: {e}") + raise + elif startup_done_task in done: + await startup_done_task + else: + raise ExporterError(f"exporter connection from {peer} timed out during startup") + try: async for cmd in queue_as_aiter(command_queue): logging.debug("exporter cmd %s", cmd)