diff --git a/src/beamlime/core/service.py b/src/beamlime/core/service.py index 3fc1d2bb6..1df02f430 100644 --- a/src/beamlime/core/service.py +++ b/src/beamlime/core/service.py @@ -137,15 +137,18 @@ def _run_loop(self) -> None: remaining = max(0.0, self._poll_interval - elapsed) if remaining > 0: time.sleep(remaining) - except RuntimeError: + except Exception: self._logger.exception("Error in service loop") - self.stop() + self._running = False + # Send a signal to the main thread to unblock it + if threading.current_thread() is not threading.main_thread(): + os.kill(os.getpid(), signal.SIGINT) finally: self._logger.info("Service loop stopped") def _stop_impl(self) -> None: """Stop the service gracefully""" - if self._thread: + if self._thread and self._thread is not threading.current_thread(): self._thread.join() @staticmethod diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index d029a6f74..e9be63580 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +import logging from dataclasses import dataclass, replace from typing import Any, Generic, Protocol, TypeVar @@ -265,7 +266,13 @@ class AdaptingMessageSource(MessageSource[U]): Wraps a source of messages and adapts them to a different type. """ - def __init__(self, source: MessageSource[T], adapter: MessageAdapter[T, U]): + def __init__( + self, + source: MessageSource[T], + adapter: MessageAdapter[T, U], + logger: logging.Logger | None = None, + ): + self._logger = logger or logging.getLogger(__name__) self._source = source self._adapter = adapter @@ -276,7 +283,14 @@ def get_messages(self) -> list[U]: try: adapted.append(self._adapter.adapt(msg)) except streaming_data_types.exceptions.WrongSchemaException: # noqa: PERF203 - pass + self._logger.warning('Message %s has an unknown schema. Skipping.', msg) + except Exception as e: + self._logger.exception('Error adapting message %s: %s', msg, e) + # Raise so service will shut down. In production we may want to simply + # skip the message, but we need more information about the possible + # errors to make that decision. For now, we raise to avoid silent + # failures. + raise return adapted def close(self) -> None: