Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions src/beamlime/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ def adapt(self, message: T) -> U:
pass


class IdentityAdapter(MessageAdapter[T, T]):
"""
An adapter that does not change the message.
"""

def adapt(self, message: T) -> T:
return message


class KafkaAdapter(MessageAdapter[KafkaMessage, Message[T]]):
"""
Base class for Kafka adapters.
Expand All @@ -114,7 +105,7 @@ def get_stream_id(self, topic: str, source_name: str) -> StreamId:
class KafkaToEv44Adapter(KafkaAdapter[Message[eventdata_ev44.EventData]]):
def adapt(self, message: KafkaMessage) -> Message[eventdata_ev44.EventData]:
ev44 = eventdata_ev44.deserialise_ev44(message.value())
stream = StreamId(kind=self._stream_kind, name=ev44.source_name)
stream = self.get_stream_id(topic=message.topic(), source_name=ev44.source_name)
# A fallback, useful in particular for testing so serialized data can be reused.
if ev44.reference_time.size > 0:
timestamp = ev44.reference_time[-1]
Expand All @@ -126,7 +117,7 @@ def adapt(self, message: KafkaMessage) -> Message[eventdata_ev44.EventData]:
class KafkaToDa00Adapter(KafkaAdapter[Message[list[dataarray_da00.Variable]]]):
def adapt(self, message: KafkaMessage) -> Message[list[dataarray_da00.Variable]]:
da00 = dataarray_da00.deserialise_da00(message.value())
key = StreamId(kind=self._stream_kind, name=da00.source_name)
key = self.get_stream_id(topic=message.topic(), source_name=da00.source_name)
timestamp = da00.timestamp_ns
return Message(timestamp=timestamp, stream=key, value=da00.data)

Expand All @@ -137,7 +128,9 @@ def __init__(self, *, stream_lut: StreamLUT | None = None):

def adapt(self, message: KafkaMessage) -> Message[logdata_f144.ExtractedLogData]:
log_data = logdata_f144.deserialise_f144(message.value())
key = StreamId(kind=StreamKind.LOG, name=log_data.source_name)
key = self.get_stream_id(
topic=message.topic(), source_name=log_data.source_name
)
timestamp = log_data.timestamp_unix_ns
return Message(timestamp=timestamp, stream=key, value=log_data)

Expand Down Expand Up @@ -304,10 +297,26 @@ def __init__(
source: MessageSource[T],
adapter: MessageAdapter[T, U],
logger: logging.Logger | None = None,
raise_on_error: bool = True,
):
"""
Parameters
----------
source
The source of messages to adapt.
adapter
The adapter to use.
logger
Logger to use for logging errors.
raise_on_error
If True, exceptions during adaptation will be re-raised. If False,
they will be logged and the message will be skipped. Messages with unknown
schemas will always be skipped.
"""
self._logger = logger or logging.getLogger(__name__)
self._source = source
self._adapter = adapter
self._raise_on_error = raise_on_error

def get_messages(self) -> list[U]:
raw_messages = self._source.get_messages()
Expand All @@ -319,11 +328,8 @@ def get_messages(self) -> list[U]:
self._logger.warning('Message %s has an unknown schema. Skipping.', msg)
except Exception as e:
self._logger.exception('Error adapting message %s: %s', msg, e)
# Raise so service will shut down. In production we may want to simply
# skip the message, but we need more information about the possible
# errors to make that decision. For now, we raise to avoid silent
# failures.
raise
if self._raise_on_error:
raise
return adapted

def close(self) -> None:
Expand Down
Loading