Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/beamlime/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,18 @@ def _run_loop(self) -> None:
remaining = max(0.0, self._poll_interval - elapsed)
if remaining > 0:
time.sleep(remaining)
except RuntimeError:
except Exception:
self._logger.exception("Error in service loop")
self.stop()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling stop here didn't work, since it tried to stop itself.

self._running = False
# Send a signal to the main thread to unblock it
if threading.current_thread() is not threading.main_thread():
os.kill(os.getpid(), signal.SIGINT)
finally:
self._logger.info("Service loop stopped")

def _stop_impl(self) -> None:
"""Stop the service gracefully"""
if self._thread:
if self._thread and self._thread is not threading.current_thread():
self._thread.join()

@staticmethod
Expand Down
18 changes: 16 additions & 2 deletions src/beamlime/kafka/message_adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
import logging
from dataclasses import dataclass, replace
from typing import Any, Generic, Protocol, TypeVar

Expand Down Expand Up @@ -265,7 +266,13 @@ class AdaptingMessageSource(MessageSource[U]):
Wraps a source of messages and adapts them to a different type.
"""

def __init__(self, source: MessageSource[T], adapter: MessageAdapter[T, U]):
def __init__(
self,
source: MessageSource[T],
adapter: MessageAdapter[T, U],
logger: logging.Logger | None = None,
):
self._logger = logger or logging.getLogger(__name__)
self._source = source
self._adapter = adapter

Expand All @@ -276,7 +283,14 @@ def get_messages(self) -> list[U]:
try:
adapted.append(self._adapter.adapt(msg))
except streaming_data_types.exceptions.WrongSchemaException: # noqa: PERF203
pass
self._logger.warning('Message %s has an unknown schema. Skipping.', msg)
except Exception as e:
self._logger.exception('Error adapting message %s: %s', msg, e)
# Raise so service will shut down. In production we may want to simply
# skip the message, but we need more information about the possible
# errors to make that decision. For now, we raise to avoid silent
# failures.
raise
return adapted

def close(self) -> None:
Expand Down