From 4608d75092b1d0cb707c849daebbbc44b277283d Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 21 Mar 2025 14:28:52 +0100 Subject: [PATCH 01/14] First working version of handler-based config mechanism --- .../config/defaults/monitor_data.yaml | 3 +- src/beamlime/core/handler.py | 3 ++ src/beamlime/core/processor.py | 8 ++-- src/beamlime/kafka/helpers.py | 7 ++++ src/beamlime/kafka/message_adapter.py | 12 ++++++ src/beamlime/service_factory.py | 20 +++------- src/beamlime/services/fake_monitors.py | 6 ++- src/beamlime/services/monitor_data.py | 38 +++++++++++++------ 8 files changed, 64 insertions(+), 33 deletions(-) diff --git a/src/beamlime/config/defaults/monitor_data.yaml b/src/beamlime/config/defaults/monitor_data.yaml index affe4a5d8..63ac154c3 100644 --- a/src/beamlime/config/defaults/monitor_data.yaml +++ b/src/beamlime/config/defaults/monitor_data.yaml @@ -1,2 +1,3 @@ topics: - - beam_monitor \ No newline at end of file + - beam_monitor + - beamlime_commands \ No newline at end of file diff --git a/src/beamlime/core/handler.py b/src/beamlime/core/handler.py index 6c976c024..93660809d 100644 --- a/src/beamlime/core/handler.py +++ b/src/beamlime/core/handler.py @@ -89,6 +89,9 @@ def __init__(self, *, factory: HandlerFactory[Tin, Tout]): def __len__(self) -> int: return sum(1 for handler in self._handlers.values() if handler is not None) + def register_handler(self, key: MessageKey, handler: Handler[Tin, Tout]) -> None: + self._handlers[key] = handler + def get(self, key: MessageKey) -> Handler[Tin, Tout] | None: if key not in self._handlers: self._handlers[key] = self._factory.make_handler(key) diff --git a/src/beamlime/core/processor.py b/src/beamlime/core/processor.py index 255f39509..82f6a9a96 100644 --- a/src/beamlime/core/processor.py +++ b/src/beamlime/core/processor.py @@ -1,12 +1,12 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations import logging from collections import defaultdict from typing import Generic, Protocol -from .handler import HandlerFactory, HandlerRegistry +from .handler import HandlerRegistry from .message import MessageSink, MessageSource, Tin, Tout @@ -32,12 +32,12 @@ def __init__( logger: logging.Logger | None = None, source: MessageSource[Tin], sink: MessageSink[Tout], - handler_factory: HandlerFactory[Tin, Tout], + handler_registry: HandlerRegistry[Tin, Tout], ) -> None: self._logger = logger or logging.getLogger(__name__) self._source = source self._sink = sink - self._handler_registry = HandlerRegistry(factory=handler_factory) + self._handler_registry = handler_registry def process(self) -> None: messages = self._source.get_messages() diff --git a/src/beamlime/kafka/helpers.py b/src/beamlime/kafka/helpers.py index 9bf001b9c..9fc871d62 100644 --- a/src/beamlime/kafka/helpers.py +++ b/src/beamlime/kafka/helpers.py @@ -41,6 +41,13 @@ def motion_topic(instrument: str) -> str: return topic_for_instrument(topic='motion', instrument=instrument) +def beamlime_command_topic(instrument: str) -> str: + """ + Return the topic name for the beamlime command data of an instrument. + """ + return topic_for_instrument(topic='beamlime_commands', instrument=instrument) + + def source_name(device: str, signal: str) -> str: """ Return the source name for a given device and signal. diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index e4381abb2..87971e8dc 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -185,6 +185,18 @@ def adapt( return replace(message, value=da00_to_scipp(message.value)) +class BeamlimeCommandsAdapter(MessageAdapter[KafkaMessage, Message[Any]]): + """ + Adapts a Kafka message to a Beamlime command message. + """ + + def adapt(self, message: KafkaMessage) -> Message[Any]: + key = MessageKey(topic=message.topic(), source_name='') + legacy_key = message.key().decode('utf-8') # See 286 + value = message.value() + return Message(key=key, timestamp=0, value={'key': legacy_key, 'value': value}) + + class ChainedAdapter(MessageAdapter[T, V]): """ Chains two adapters together. diff --git a/src/beamlime/service_factory.py b/src/beamlime/service_factory.py index d3c16e908..ca765be20 100644 --- a/src/beamlime/service_factory.py +++ b/src/beamlime/service_factory.py @@ -3,11 +3,10 @@ from __future__ import annotations import logging -from collections.abc import Callable from typing import Generic, TypeVar -from .core import ConfigSubscriber, MessageSink, StreamProcessor -from .core.handler import Config, HandlerFactory +from .core import MessageSink, StreamProcessor +from .core.handler import HandlerRegistry from .core.service import Service from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter from .kafka.source import KafkaConsumer, KafkaMessageSource @@ -25,29 +24,22 @@ def __init__( name: str, log_level: int = logging.INFO, adapter: MessageAdapter[Traw, Tin], - handler_factory_cls: Callable[[Config], HandlerFactory[Tin, Tout]], + handler_registry: HandlerRegistry[Tin, Tout], ): self._name = f'{instrument}_{name}' self._log_level = log_level self._adapter = adapter - self._handler_factory_cls = handler_factory_cls + self._handler_registry = handler_registry - def build( - self, - control_consumer: KafkaConsumer, - consumer: KafkaConsumer, - sink: MessageSink[Tout], - ) -> Service: - config_subscriber = ConfigSubscriber(consumer=control_consumer, config={}) + def build(self, consumer: KafkaConsumer, sink: MessageSink[Tout]) -> Service: processor = StreamProcessor( source=AdaptingMessageSource( source=KafkaMessageSource(consumer=consumer), adapter=self._adapter ), sink=sink, - handler_factory=self._handler_factory_cls(config=config_subscriber), + handler_registry=self._handler_registry, ) return Service( - children=[config_subscriber], processor=processor, name=self._name, log_level=self._log_level, diff --git a/src/beamlime/services/fake_monitors.py b/src/beamlime/services/fake_monitors.py index 503575b33..0fd46c057 100644 --- a/src/beamlime/services/fake_monitors.py +++ b/src/beamlime/services/fake_monitors.py @@ -19,7 +19,7 @@ ) from beamlime.config import config_names from beamlime.config.config_loader import load_config -from beamlime.core.handler import CommonHandlerFactory +from beamlime.core.handler import CommonHandlerFactory, HandlerRegistry from beamlime.kafka.helpers import beam_monitor_topic from beamlime.kafka.message_adapter import AdaptingMessageSource, MessageAdapter from beamlime.kafka.sink import ( @@ -129,7 +129,9 @@ def run_service( processor = StreamProcessor( source=source, sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), - handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), + handler_registry=HandlerRegistry( + factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler) + ), ) service = Service(processor=processor, name=service_name, log_level=log_level) service.start() diff --git a/src/beamlime/services/monitor_data.py b/src/beamlime/services/monitor_data.py index 0d622e9c6..53448fff3 100644 --- a/src/beamlime/services/monitor_data.py +++ b/src/beamlime/services/monitor_data.py @@ -8,13 +8,18 @@ from beamlime import CommonHandlerFactory, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.core.handler import HandlerRegistry +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.monitor_data_handler import create_monitor_data_handler from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beam_monitor_topic, beamlime_command_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Da00ToScippAdapter, KafkaToDa00Adapter, KafkaToMonitorEventsAdapter, + RouteByTopicAdapter, RoutingAdapter, ) from beamlime.kafka.sink import KafkaSink @@ -33,28 +38,42 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser -def make_monitor_data_adapter() -> RoutingAdapter: - return RoutingAdapter( +def make_monitor_data_adapter(instrument: str) -> RoutingAdapter: + monitors = RoutingAdapter( routes={ 'ev44': KafkaToMonitorEventsAdapter(), 'da00': ChainedAdapter( first=KafkaToDa00Adapter(), second=Da00ToScippAdapter() ), + # TODO adapter for control messages } ) + return RouteByTopicAdapter( + routes={ + beam_monitor_topic(instrument): monitors, + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) def make_monitor_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = CommonHandlerFactory( + handler_cls=create_monitor_data_handler, config=config + ) + handler_registry = HandlerRegistry(factory=handler_factory) + handler_registry.register_handler( + ConfigHandler.message_key(instrument), config_handler + ) return DataServiceBuilder( instrument=instrument, name='monitor_data', log_level=log_level, - adapter=make_monitor_data_adapter(), - handler_factory_cls=CommonHandlerFactory.from_handler( - create_monitor_data_handler - ), + adapter=make_monitor_data_adapter(instrument=instrument), + handler_registry=handler_registry, ) @@ -77,9 +96,6 @@ def run_service( builder = make_monitor_service_builder(instrument=instrument, log_level=log_level) with ExitStack() as stack: - control_consumer = stack.enter_context( - kafka_consumer.make_control_consumer(instrument=instrument) - ) consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], @@ -88,9 +104,7 @@ def run_service( group='monitor_data', ) ) - service = builder.build( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + service = builder.build(consumer=consumer, sink=sink) service.start() From dbc2e987bc04d83c632788f0befaf3e9350344ef Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 21 Mar 2025 14:41:55 +0100 Subject: [PATCH 02/14] Revert to correct control topic config --- src/beamlime/config/defaults/monitor_data.yaml | 3 +-- src/beamlime/kafka/source.py | 15 +++++++++++++++ src/beamlime/services/monitor_data.py | 7 ++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/beamlime/config/defaults/monitor_data.yaml b/src/beamlime/config/defaults/monitor_data.yaml index 63ac154c3..affe4a5d8 100644 --- a/src/beamlime/config/defaults/monitor_data.yaml +++ b/src/beamlime/config/defaults/monitor_data.yaml @@ -1,3 +1,2 @@ topics: - - beam_monitor - - beamlime_commands \ No newline at end of file + - beam_monitor \ No newline at end of file diff --git a/src/beamlime/kafka/source.py b/src/beamlime/kafka/source.py index f19d4cbee..7d23098c7 100644 --- a/src/beamlime/kafka/source.py +++ b/src/beamlime/kafka/source.py @@ -14,6 +14,21 @@ def close(self) -> None: pass +class MultiConsumer: + def __init__(self, consumers): + self._consumers = consumers + + def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: + messages = [] + for consumer in self._consumers: + messages.extend(consumer.consume(num_messages, timeout)) + return messages + + def close(self) -> None: + for consumer in self._consumers: + consumer.close() + + class KafkaMessageSource(MessageSource[KafkaMessage]): """ Message source for messages from Kafka. diff --git a/src/beamlime/services/monitor_data.py b/src/beamlime/services/monitor_data.py index 53448fff3..1b895614e 100644 --- a/src/beamlime/services/monitor_data.py +++ b/src/beamlime/services/monitor_data.py @@ -23,6 +23,7 @@ RoutingAdapter, ) from beamlime.kafka.sink import KafkaSink +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -96,7 +97,10 @@ def run_service( builder = make_monitor_service_builder(instrument=instrument, log_level=log_level) with ExitStack() as stack: - consumer = stack.enter_context( + control_consumer = stack.enter_context( + kafka_consumer.make_control_consumer(instrument=instrument) + ) + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -104,6 +108,7 @@ def run_service( group='monitor_data', ) ) + consumer = MultiConsumer([control_consumer, data_consumer]) service = builder.build(consumer=consumer, sink=sink) service.start() From 68087f1658e041c34f2a4baa7c7866f705e761b0 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 07:53:19 +0100 Subject: [PATCH 03/14] Improve service builder --- src/beamlime/kafka/message_adapter.py | 9 +++++++ src/beamlime/service_factory.py | 28 +++++++++++--------- src/beamlime/services/data_reduction.py | 2 +- src/beamlime/services/detector_data.py | 2 +- src/beamlime/services/fake_monitors.py | 34 ++++++++++++------------- src/beamlime/services/monitor_data.py | 13 ++++------ src/beamlime/services/timeseries.py | 2 +- tests/services/data_service_test.py | 2 +- tests/services/detector_data_test.py | 8 +++--- tests/services/monitor_data_test.py | 8 +++--- tests/services/timeseries_test.py | 4 +-- 11 files changed, 60 insertions(+), 52 deletions(-) diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index 87971e8dc..3ae2a6ebf 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -69,6 +69,15 @@ def adapt(self, message: T) -> U: pass +class IdentityAdapter(MessageAdapter[T, T]): + """ + An adapter that does not change the message. + """ + + def adapt(self, message: T) -> T: + return message + + class KafkaToEv44Adapter( MessageAdapter[KafkaMessage, Message[eventdata_ev44.EventData]] ): diff --git a/src/beamlime/service_factory.py b/src/beamlime/service_factory.py index ca765be20..ab9ca5549 100644 --- a/src/beamlime/service_factory.py +++ b/src/beamlime/service_factory.py @@ -6,7 +6,8 @@ from typing import Generic, TypeVar from .core import MessageSink, StreamProcessor -from .core.handler import HandlerRegistry +from .core.handler import HandlerFactory, HandlerRegistry +from .core.message import MessageKey, MessageSource from .core.service import Service from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter from .kafka.source import KafkaConsumer, KafkaMessageSource @@ -24,23 +25,26 @@ def __init__( name: str, log_level: int = logging.INFO, adapter: MessageAdapter[Traw, Tin], - handler_registry: HandlerRegistry[Tin, Tout], + handler_factory: HandlerFactory[Tin, Tout], ): self._name = f'{instrument}_{name}' self._log_level = log_level self._adapter = adapter - self._handler_registry = handler_registry + self._handler_registry = HandlerRegistry(factory=handler_factory) - def build(self, consumer: KafkaConsumer, sink: MessageSink[Tout]) -> Service: + def add_handler(self, key: MessageKey, handler: HandlerFactory[Tin, Tout]) -> None: + """Add specific handler to use for given key, instead of using the factory.""" + self._handler_registry.register_handler(key=key, handler=handler) + + def from_consumer( + self, consumer: KafkaConsumer, sink: MessageSink[Tout] + ) -> Service: + return self.from_source(source=KafkaMessageSource(consumer=consumer), sink=sink) + + def from_source(self, source: MessageSource, sink: MessageSink[Tout]) -> Service: processor = StreamProcessor( - source=AdaptingMessageSource( - source=KafkaMessageSource(consumer=consumer), adapter=self._adapter - ), + source=AdaptingMessageSource(source=source, adapter=self._adapter), sink=sink, handler_registry=self._handler_registry, ) - return Service( - processor=processor, - name=self._name, - log_level=self._log_level, - ) + return Service(processor=processor, name=self._name, log_level=self._log_level) diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index 6f073ddaa..c19de57c3 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -103,7 +103,7 @@ def run_service( group='data_reduction', ) ) - service = builder.build( + service = builder.from_consumer( control_consumer=control_consumer, consumer=consumer, sink=sink ) service.start() diff --git a/src/beamlime/services/detector_data.py b/src/beamlime/services/detector_data.py index 3f0b72249..307126d74 100644 --- a/src/beamlime/services/detector_data.py +++ b/src/beamlime/services/detector_data.py @@ -82,7 +82,7 @@ def run_service( group='detector_data', ) ) - service = builder.build( + service = builder.from_consumer( control_consumer=control_consumer, consumer=consumer, sink=sink ) service.start() diff --git a/src/beamlime/services/fake_monitors.py b/src/beamlime/services/fake_monitors.py index 0fd46c057..2dffb916c 100644 --- a/src/beamlime/services/fake_monitors.py +++ b/src/beamlime/services/fake_monitors.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) import logging import time from dataclasses import replace @@ -15,18 +15,18 @@ MessageKey, MessageSource, Service, - StreamProcessor, ) from beamlime.config import config_names from beamlime.config.config_loader import load_config -from beamlime.core.handler import CommonHandlerFactory, HandlerRegistry +from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import beam_monitor_topic -from beamlime.kafka.message_adapter import AdaptingMessageSource, MessageAdapter +from beamlime.kafka.message_adapter import IdentityAdapter, MessageAdapter from beamlime.kafka.sink import ( KafkaSink, SerializationError, serialize_dataarray_to_da00, ) +from beamlime.service_factory import DataServiceBuilder class FakeMonitorSource(MessageSource[sc.Variable]): @@ -113,27 +113,25 @@ def serialize_variable_to_monitor_ev44(msg: Message[sc.Variable]) -> bytes: def run_service( *, instrument: str, mode: Literal['ev44', 'da00'], log_level: int = logging.INFO ) -> NoReturn: - service_name = f'{instrument}_fake_{mode}_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) + source = FakeMonitorSource(instrument=instrument) if mode == 'ev44': - source = FakeMonitorSource(instrument=instrument) + adapter = IdentityAdapter() serializer = serialize_variable_to_monitor_ev44 else: - source = AdaptingMessageSource( - source=FakeMonitorSource(instrument=instrument), - adapter=EventsToHistogramAdapter( - toa=sc.linspace('toa', 0, 71_000_000, num=100, unit='ns') - ), + adapter = EventsToHistogramAdapter( + toa=sc.linspace('toa', 0, 71_000_000, num=100, unit='ns') ) serializer = serialize_dataarray_to_da00 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), - handler_registry=HandlerRegistry( - factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler) - ), + builder = DataServiceBuilder( + instrument=instrument, + name=f'fake_{mode}_producer', + log_level=log_level, + adapter=adapter, + handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + sink = KafkaSink(kafka_config=kafka_config, serializer=serializer) + service = builder.from_source(source=source, sink=sink) service.start() diff --git a/src/beamlime/services/monitor_data.py b/src/beamlime/services/monitor_data.py index 1b895614e..05dd7ee45 100644 --- a/src/beamlime/services/monitor_data.py +++ b/src/beamlime/services/monitor_data.py @@ -8,7 +8,6 @@ from beamlime import CommonHandlerFactory, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config -from beamlime.core.handler import HandlerRegistry from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.monitor_data_handler import create_monitor_data_handler from beamlime.kafka import consumer as kafka_consumer @@ -65,17 +64,15 @@ def make_monitor_service_builder( handler_factory = CommonHandlerFactory( handler_cls=create_monitor_data_handler, config=config ) - handler_registry = HandlerRegistry(factory=handler_factory) - handler_registry.register_handler( - ConfigHandler.message_key(instrument), config_handler - ) - return DataServiceBuilder( + builder = DataServiceBuilder( instrument=instrument, name='monitor_data', log_level=log_level, adapter=make_monitor_data_adapter(instrument=instrument), - handler_registry=handler_registry, + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -109,7 +106,7 @@ def run_service( ) ) consumer = MultiConsumer([control_consumer, data_consumer]) - service = builder.build(consumer=consumer, sink=sink) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/src/beamlime/services/timeseries.py b/src/beamlime/services/timeseries.py index f7c8dca36..f2cf1425a 100644 --- a/src/beamlime/services/timeseries.py +++ b/src/beamlime/services/timeseries.py @@ -88,7 +88,7 @@ def run_service( group='timeseries', ) ) - service = builder.build( + service = builder.from_consumer( control_consumer=control_consumer, consumer=consumer, sink=sink ) service.start() diff --git a/tests/services/data_service_test.py b/tests/services/data_service_test.py index f781384a7..010793c34 100644 --- a/tests/services/data_service_test.py +++ b/tests/services/data_service_test.py @@ -71,7 +71,7 @@ def test_basics() -> None: ) sink = FakeMessageSink() consumer = IntConsumer() - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=sink ) service.start(blocking=False) diff --git a/tests/services/detector_data_test.py b/tests/services/detector_data_test.py index 28a8eaa6c..eb7a941ba 100644 --- a/tests/services/detector_data_test.py +++ b/tests/services/detector_data_test.py @@ -124,7 +124,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non # It is thus always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_detector_service_builder(instrument=instrument) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=EmptyConsumer(), sink=FakeMessageSink(), @@ -136,7 +136,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=sink ) service.start(blocking=False) @@ -148,7 +148,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non @pytest.mark.parametrize('instrument', available_instruments()) def test_detector_data_service(instrument: str) -> None: builder = make_detector_service_builder(instrument=instrument) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=EmptyConsumer(), sink=FakeMessageSink(), @@ -157,7 +157,7 @@ def test_detector_data_service(instrument: str) -> None: consumer = Ev44Consumer( instrument=instrument, events_per_message=100, max_events=10_000 ) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=UnrollingSinkAdapter(sink), diff --git a/tests/services/monitor_data_test.py b/tests/services/monitor_data_test.py index 3d8d2ac93..fb6b0c7f6 100644 --- a/tests/services/monitor_data_test.py +++ b/tests/services/monitor_data_test.py @@ -107,7 +107,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No # It is this always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_monitor_service_builder(instrument='dummy') - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=EmptyConsumer(), sink=FakeMessageSink(), @@ -119,7 +119,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=sink ) service.start(blocking=False) @@ -129,7 +129,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No def test_monitor_data_service() -> None: builder = make_monitor_service_builder(instrument='dummy') - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=EmptyConsumer(), sink=FakeMessageSink(), @@ -137,7 +137,7 @@ def test_monitor_data_service() -> None: sink = FakeMessageSink() consumer = Ev44Consumer(num_sources=2, events_per_message=100, max_events=10_000) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=sink ) service.start(blocking=False) diff --git a/tests/services/timeseries_test.py b/tests/services/timeseries_test.py index decc30d89..454ef85ac 100644 --- a/tests/services/timeseries_test.py +++ b/tests/services/timeseries_test.py @@ -164,7 +164,7 @@ def test_timeseries_service(instrument: str) -> None: source_names=source_names, ) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=UnrollingSinkAdapter(sink), @@ -219,7 +219,7 @@ def test_timeseries_accumulation() -> None: source_names=['detector_rotation'], # Use just one source for simplicity ) - service = builder.build( + service = builder.from_consumer( control_consumer=EmptyConsumer(), consumer=consumer, sink=UnrollingSinkAdapter(sink), From 1025490c3d6dd28a8450db4391f9331dbfe9ad76 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 08:01:15 +0100 Subject: [PATCH 04/14] Update fake_detectors --- src/beamlime/service_factory.py | 10 +++++++--- src/beamlime/services/fake_detectors.py | 24 ++++++++++-------------- src/beamlime/services/fake_monitors.py | 11 ++++++----- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/beamlime/service_factory.py b/src/beamlime/service_factory.py index ab9ca5549..2f3f5ab91 100644 --- a/src/beamlime/service_factory.py +++ b/src/beamlime/service_factory.py @@ -9,7 +9,11 @@ from .core.handler import HandlerFactory, HandlerRegistry from .core.message import MessageKey, MessageSource from .core.service import Service -from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter +from .kafka.message_adapter import ( + AdaptingMessageSource, + IdentityAdapter, + MessageAdapter, +) from .kafka.source import KafkaConsumer, KafkaMessageSource Traw = TypeVar("Traw") @@ -24,12 +28,12 @@ def __init__( instrument: str, name: str, log_level: int = logging.INFO, - adapter: MessageAdapter[Traw, Tin], + adapter: MessageAdapter[Traw, Tin] | None = None, handler_factory: HandlerFactory[Tin, Tout], ): self._name = f'{instrument}_{name}' self._log_level = log_level - self._adapter = adapter + self._adapter = adapter or IdentityAdapter() self._handler_registry = HandlerRegistry(factory=handler_factory) def add_handler(self, key: MessageKey, handler: HandlerFactory[Tin, Tout]) -> None: diff --git a/src/beamlime/services/fake_detectors.py b/src/beamlime/services/fake_detectors.py index db056427e..89b24f9e1 100644 --- a/src/beamlime/services/fake_detectors.py +++ b/src/beamlime/services/fake_detectors.py @@ -11,19 +11,13 @@ import scipp as sc from streaming_data_types import eventdata_ev44 -from beamlime import ( - Handler, - Message, - MessageKey, - MessageSource, - Service, - StreamProcessor, -) +from beamlime import Handler, Message, MessageKey, MessageSource, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import detector_topic from beamlime.kafka.sink import KafkaSink, SerializationError +from beamlime.service_factory import DataServiceBuilder # Configure detectors to fake for each instrument # Values as of January 2025. These may change if the detector configuration changes. @@ -173,16 +167,18 @@ def serialize_detector_events_to_ev44( def run_service(*, instrument: str, log_level: int = logging.INFO) -> NoReturn: - service_name = f'{instrument}_fake_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) - source = FakeDetectorSource(instrument=instrument) serializer = serialize_detector_events_to_ev44 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + builder = DataServiceBuilder( + instrument=instrument, + name='fake_producer', + log_level=log_level, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + service = builder.from_source( + source=FakeDetectorSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() diff --git a/src/beamlime/services/fake_monitors.py b/src/beamlime/services/fake_monitors.py index 2dffb916c..cc3b827b6 100644 --- a/src/beamlime/services/fake_monitors.py +++ b/src/beamlime/services/fake_monitors.py @@ -20,7 +20,7 @@ from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import beam_monitor_topic -from beamlime.kafka.message_adapter import IdentityAdapter, MessageAdapter +from beamlime.kafka.message_adapter import MessageAdapter from beamlime.kafka.sink import ( KafkaSink, SerializationError, @@ -114,9 +114,8 @@ def run_service( *, instrument: str, mode: Literal['ev44', 'da00'], log_level: int = logging.INFO ) -> NoReturn: kafka_config = load_config(namespace=config_names.kafka_upstream) - source = FakeMonitorSource(instrument=instrument) if mode == 'ev44': - adapter = IdentityAdapter() + adapter = None serializer = serialize_variable_to_monitor_ev44 else: adapter = EventsToHistogramAdapter( @@ -130,8 +129,10 @@ def run_service( adapter=adapter, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - sink = KafkaSink(kafka_config=kafka_config, serializer=serializer) - service = builder.from_source(source=source, sink=sink) + service = builder.from_source( + source=FakeMonitorSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() From afe24b0559219fb906820fcd8da90aba3e2f1ed8 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 08:04:34 +0100 Subject: [PATCH 05/14] Update fake_logdata --- src/beamlime/services/fake_logdata.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/beamlime/services/fake_logdata.py b/src/beamlime/services/fake_logdata.py index 1387c40f6..9c0d78f99 100644 --- a/src/beamlime/services/fake_logdata.py +++ b/src/beamlime/services/fake_logdata.py @@ -6,19 +6,13 @@ import numpy as np import scipp as sc -from beamlime import ( - Handler, - Message, - MessageKey, - MessageSource, - Service, - StreamProcessor, -) +from beamlime import Handler, Message, MessageKey, MessageSource, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import motion_topic from beamlime.kafka.sink import KafkaSink, serialize_dataarray_to_f144 +from beamlime.service_factory import DataServiceBuilder def _make_ramp(size: int) -> sc.DataArray: @@ -131,16 +125,18 @@ def handle(self, messages: list[Message[T]]) -> list[Message[T]]: def run_service(*, instrument: str, log_level: int = logging.INFO) -> NoReturn: - service_name = f'{instrument}_fake_f144_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) - source = FakeLogdataSource(instrument=instrument) serializer = serialize_dataarray_to_f144 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + builder = DataServiceBuilder( + instrument=instrument, + name='fake_f144_producer', + log_level=log_level, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + service = builder.from_source( + source=FakeLogdataSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() From 5376269665d00e32f7a03be6c08b4efae3d30b95 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 08:14:17 +0100 Subject: [PATCH 06/14] Refactory detector data service --- src/beamlime/services/detector_data.py | 42 ++++++++++++++++++-------- src/beamlime/services/monitor_data.py | 1 - 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/src/beamlime/services/detector_data.py b/src/beamlime/services/detector_data.py index 307126d74..325b8b225 100644 --- a/src/beamlime/services/detector_data.py +++ b/src/beamlime/services/detector_data.py @@ -5,20 +5,24 @@ import argparse import logging from contextlib import ExitStack -from functools import partial from typing import Literal, NoReturn from beamlime import Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.detector_data_handler import DetectorHandlerFactory from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beamlime_command_topic, detector_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Ev44ToDetectorEventsAdapter, KafkaToEv44Adapter, + RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -34,21 +38,34 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser -def make_detector_service_builder( - *, instrument: str, log_level: int = logging.INFO -) -> DataServiceBuilder: - adapter = ChainedAdapter( +def make_detector_data_adapter(instrument: str) -> RouteByTopicAdapter: + detectors = ChainedAdapter( first=KafkaToEv44Adapter(), second=Ev44ToDetectorEventsAdapter(merge_detectors=instrument == 'bifrost'), ) - handler_factory_cls = partial(DetectorHandlerFactory, instrument=instrument) - return DataServiceBuilder( + return RouteByTopicAdapter( + routes={ + detector_topic(instrument): detectors, + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) + + +def make_detector_service_builder( + *, instrument: str, log_level: int = logging.INFO +) -> DataServiceBuilder: + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = DetectorHandlerFactory(instrument=instrument, config=config) + builder = DataServiceBuilder( instrument=instrument, name='detector_data', log_level=log_level, - adapter=adapter, - handler_factory_cls=handler_factory_cls, + adapter=make_detector_data_adapter(instrument=instrument), + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -74,7 +91,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -82,9 +99,8 @@ def run_service( group='detector_data', ) ) - service = builder.from_consumer( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/src/beamlime/services/monitor_data.py b/src/beamlime/services/monitor_data.py index 05dd7ee45..32ed9de83 100644 --- a/src/beamlime/services/monitor_data.py +++ b/src/beamlime/services/monitor_data.py @@ -45,7 +45,6 @@ def make_monitor_data_adapter(instrument: str) -> RoutingAdapter: 'da00': ChainedAdapter( first=KafkaToDa00Adapter(), second=Da00ToScippAdapter() ), - # TODO adapter for control messages } ) return RouteByTopicAdapter( From cc3ec4aefd01e0c014f44157128b6deec6f183cb Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:06:12 +0100 Subject: [PATCH 07/14] Update timeseries service, support serializing datetime64 --- src/beamlime/kafka/scipp_da00_compat.py | 12 +++++++++ src/beamlime/services/timeseries.py | 34 ++++++++++++++++--------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/beamlime/kafka/scipp_da00_compat.py b/src/beamlime/kafka/scipp_da00_compat.py index 5c4c6ce76..454d59df5 100644 --- a/src/beamlime/kafka/scipp_da00_compat.py +++ b/src/beamlime/kafka/scipp_da00_compat.py @@ -29,6 +29,15 @@ def da00_to_scipp( def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable: + if var.dtype == sc.DType.datetime64: + timedelta = var - sc.epoch(unit=var.unit) + return dataarray_da00.Variable( + name=name, + data=timedelta.values, + axes=list(var.dims), + shape=var.shape, + unit=f'datetime64[{var.unit}]', + ) return dataarray_da00.Variable( name=name, data=var.values, @@ -39,4 +48,7 @@ def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable: def _to_scipp_variable(var: dataarray_da00.Variable) -> sc.Variable: + if var.unit.startswith('datetime64'): + unit = var.unit.split('[')[1].rstrip(']') + return sc.epoch(unit=unit) + sc.array(dims=var.axes, values=var.data, unit=unit) return sc.array(dims=var.axes, values=var.data, unit=var.unit) diff --git a/src/beamlime/services/timeseries.py b/src/beamlime/services/timeseries.py index f2cf1425a..62fa494c7 100644 --- a/src/beamlime/services/timeseries.py +++ b/src/beamlime/services/timeseries.py @@ -6,7 +6,6 @@ import logging from collections.abc import Mapping from contextlib import ExitStack -from functools import partial from typing import Any, Literal, NoReturn from beamlime import Service @@ -14,12 +13,16 @@ from beamlime.config.config_loader import load_config from beamlime.handlers.timeseries_handler import LogdataHandlerFactory from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beamlime_command_topic, motion_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, F144ToLogDataAdapter, KafkaToF144Adapter, + RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -35,24 +38,32 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser +def make_timeseries_adapter(instrument: str) -> RouteByTopicAdapter: + return RouteByTopicAdapter( + routes={ + motion_topic(instrument): ChainedAdapter( + first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() + ), + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) + + def make_timeseries_service_builder( *, instrument: str, log_level: int = logging.INFO, attribute_registry: Mapping[str, Mapping[str, Any]] | None = None, ) -> DataServiceBuilder: - adapter = ChainedAdapter(first=KafkaToF144Adapter(), second=F144ToLogDataAdapter()) - handler_factory_cls = partial( - LogdataHandlerFactory, - instrument=instrument, - attribute_registry=attribute_registry, + handler_factory = LogdataHandlerFactory( + instrument=instrument, attribute_registry=attribute_registry, config={} ) return DataServiceBuilder( instrument=instrument, name='timeseries', log_level=log_level, - adapter=adapter, - handler_factory_cls=handler_factory_cls, + adapter=make_timeseries_adapter(instrument=instrument), + handler_factory=handler_factory, ) @@ -80,7 +91,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=('motion',), config={**consumer_config, **kafka_upstream_config}, @@ -88,9 +99,8 @@ def run_service( group='timeseries', ) ) - service = builder.from_consumer( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() From 098023f84c96be991599fa88b66c1b22ca5d158b Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:15:33 +0100 Subject: [PATCH 08/14] Update data-reduction service --- src/beamlime/services/data_reduction.py | 34 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index c19de57c3..a67b717de 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -5,16 +5,23 @@ import argparse import logging from contextlib import ExitStack -from functools import partial from typing import Literal, NoReturn from beamlime import Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.config.raw_detectors import get_config +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.data_reduction_handler import ReductionHandlerFactory from beamlime.kafka import consumer as kafka_consumer -from beamlime.kafka.helpers import beam_monitor_topic, detector_topic, motion_topic +from beamlime.kafka.helpers import ( + beam_monitor_topic, + beamlime_command_topic, + detector_topic, + motion_topic, +) from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Ev44ToDetectorEventsAdapter, Ev44ToMonitorEventsAdapter, @@ -24,11 +31,10 @@ RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink -from ..config.raw_detectors import get_config - def setup_arg_parser() -> argparse.ArgumentParser: parser = Service.setup_arg_parser(description='Data Reduction Service') @@ -58,18 +64,23 @@ def make_reduction_service_builder( motion_topic(instrument): ChainedAdapter( first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() ), + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), } ) - handler_factory_cls = partial( - ReductionHandlerFactory, instrument_config=get_config(instrument) + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = ReductionHandlerFactory( + instrument_config=get_config(instrument), config=config ) - return DataServiceBuilder( + builder = DataServiceBuilder( instrument=instrument, name='data_reduction', log_level=log_level, adapter=adapter, - handler_factory_cls=handler_factory_cls, + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -95,7 +106,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -103,9 +114,8 @@ def run_service( group='data_reduction', ) ) - service = builder.from_consumer( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() From a928d1da280918ee6cc5d49e9f6391734ded7628 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:28:08 +0100 Subject: [PATCH 09/14] Update tests --- tests/processor_test.py | 9 ++++++--- tests/services/data_service_test.py | 14 ++------------ tests/services/detector_data_test.py | 27 +++++++-------------------- tests/services/monitor_data_test.py | 24 ++++++------------------ tests/services/timeseries_test.py | 20 ++------------------ 5 files changed, 23 insertions(+), 71 deletions(-) diff --git a/tests/processor_test.py b/tests/processor_test.py index f3676f93f..5e377f540 100644 --- a/tests/processor_test.py +++ b/tests/processor_test.py @@ -40,7 +40,8 @@ def test_consumes_and_produces_messages() -> None: ) sink = FakeMessageSink() factory = CommonHandlerFactory(config=config, handler_cls=ValueToStringHandler) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) processor.process() assert len(sink.messages) == 0 processor.process() @@ -107,7 +108,8 @@ def test_processor_skips_keys_without_handlers(): factory = SelectiveHandlerFactory( config=config, handler_cls=ValueToStringHandler, allowed_keys=['allowed'] ) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) processor.process() @@ -135,7 +137,8 @@ def test_processor_with_mixed_handlers(): factory = SelectiveHandlerFactory( config=config, handler_cls=ValueToStringHandler, allowed_keys=['allowed'] ) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) # First batch: one allowed message processor.process() diff --git a/tests/services/data_service_test.py b/tests/services/data_service_test.py index 010793c34..24f328ccf 100644 --- a/tests/services/data_service_test.py +++ b/tests/services/data_service_test.py @@ -32,14 +32,6 @@ def handle(self, messages: list[Message[int]]) -> list[Message[int]]: return messages -class EmptyConsumer(KafkaConsumer): - def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: - return [] - - def close(self) -> None: - pass - - class IntConsumer(KafkaConsumer): def __init__(self) -> None: self._values = [11, 22, 33, 44] @@ -67,13 +59,11 @@ def test_basics() -> None: instrument='instrument', name='name', adapter=ForwardingAdapter(), - handler_factory_cls=CommonHandlerFactory.from_handler(ForwardingHandler), + handler_factory=CommonHandlerFactory(handler_cls=ForwardingHandler, config={}), ) sink = FakeMessageSink() consumer = IntConsumer() - service = builder.from_consumer( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) while not consumer.at_end: time.sleep(0.1) diff --git a/tests/services/detector_data_test.py b/tests/services/detector_data_test.py index eb7a941ba..b7eceef1b 100644 --- a/tests/services/detector_data_test.py +++ b/tests/services/detector_data_test.py @@ -9,7 +9,7 @@ from beamlime.config.raw_detectors import available_instruments, get_config from beamlime.fakes import FakeMessageSink -from beamlime.kafka.helpers import source_name +from beamlime.kafka.helpers import detector_topic, source_name from beamlime.kafka.message_adapter import FakeKafkaMessage, KafkaMessage from beamlime.kafka.sink import UnrollingSinkAdapter from beamlime.kafka.source import KafkaConsumer @@ -32,6 +32,7 @@ def __init__( events_per_message: int = 1_000, max_events: int = 1_000_000, ) -> None: + self._topic = detector_topic(instrument=instrument) self._detector_config = detector_config[instrument] self._events_per_message = events_per_message self._max_events = max_events @@ -98,7 +99,7 @@ def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: messages = [ FakeKafkaMessage( value=self._content[(self._current + msg) % len(self._content)], - topic="dummy", + topic=self._topic, timestamp=self._make_timestamp(), ) for msg in range(messages_to_produce) @@ -124,11 +125,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non # It is thus always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_detector_service_builder(instrument=instrument) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( @@ -136,9 +133,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) benchmark(start_and_wait_for_completion, consumer=consumer) service.stop() @@ -148,20 +143,12 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non @pytest.mark.parametrize('instrument', available_instruments()) def test_detector_data_service(instrument: str) -> None: builder = make_detector_service_builder(instrument=instrument) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( instrument=instrument, events_per_message=100, max_events=10_000 ) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) service.stop() diff --git a/tests/services/monitor_data_test.py b/tests/services/monitor_data_test.py index fb6b0c7f6..f03d39d28 100644 --- a/tests/services/monitor_data_test.py +++ b/tests/services/monitor_data_test.py @@ -8,7 +8,7 @@ from streaming_data_types import eventdata_ev44 from beamlime.fakes import FakeMessageSink -from beamlime.kafka.helpers import source_name +from beamlime.kafka.helpers import beam_monitor_topic, source_name from beamlime.kafka.message_adapter import FakeKafkaMessage, KafkaMessage from beamlime.kafka.source import KafkaConsumer from beamlime.services.monitor_data import make_monitor_service_builder @@ -82,7 +82,7 @@ def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: messages = [ FakeKafkaMessage( value=self._content[msg % self._num_sources], - topic="dummy", + topic=beam_monitor_topic("dummy"), timestamp=self._make_timestamp(), ) for msg in range(num_messages) @@ -107,11 +107,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No # It is this always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_monitor_service_builder(instrument='dummy') - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( @@ -119,9 +115,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) benchmark(start_and_wait_for_completion, consumer=consumer) service.stop() @@ -129,17 +123,11 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No def test_monitor_data_service() -> None: builder = make_monitor_service_builder(instrument='dummy') - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer(num_sources=2, events_per_message=100, max_events=10_000) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) source_names = [msg.key.source_name for msg in sink.messages] diff --git a/tests/services/timeseries_test.py b/tests/services/timeseries_test.py index 454ef85ac..2154c893c 100644 --- a/tests/services/timeseries_test.py +++ b/tests/services/timeseries_test.py @@ -18,14 +18,6 @@ from beamlime.services.timeseries import make_timeseries_service_builder -class EmptyConsumer(KafkaConsumer): - def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: - return [] - - def close(self) -> None: - pass - - class F144Consumer(KafkaConsumer): def __init__( self, @@ -164,11 +156,7 @@ def test_timeseries_service(instrument: str) -> None: source_names=source_names, ) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) @@ -219,11 +207,7 @@ def test_timeseries_accumulation() -> None: source_names=['detector_rotation'], # Use just one source for simplicity ) - service = builder.from_consumer( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) From d90f5fbd11cddb3f5adc1d84e9d9884df0e64993 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:29:47 +0100 Subject: [PATCH 10/14] Remove child handling for class Service --- src/beamlime/core/service.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/beamlime/core/service.py b/src/beamlime/core/service.py index f3f1e1c3f..53bfff687 100644 --- a/src/beamlime/core/service.py +++ b/src/beamlime/core/service.py @@ -100,7 +100,6 @@ class Service(ServiceBase): def __init__( self, *, - children: list[StartStoppable] | None = None, processor: Processor, name: str | None = None, log_level: int = logging.INFO, @@ -108,14 +107,11 @@ def __init__( ): super().__init__(name=name, log_level=log_level) self._poll_interval = poll_interval - self._children = children or [] self._processor = processor self._thread: threading.Thread | None = None def _start_impl(self) -> None: """Start the service and block until stopped""" - for child in self._children: - child.start() self._thread = threading.Thread(target=self._run_loop) self._thread.start() @@ -147,8 +143,6 @@ def _stop_impl(self) -> None: """Stop the service gracefully""" if self._thread: self._thread.join() - for child in self._children: - child.stop() @staticmethod def setup_arg_parser(description: str) -> argparse.ArgumentParser: From d57e09a40132014555fb953623de6a4e4686ffd2 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:31:10 +0100 Subject: [PATCH 11/14] Remove class ConfigSubscriber Replaced by handler-based config update mechanism --- src/beamlime/__init__.py | 2 - src/beamlime/core/__init__.py | 2 - src/beamlime/core/config_subscriber.py | 71 -------------------------- 3 files changed, 75 deletions(-) delete mode 100644 src/beamlime/core/config_subscriber.py diff --git a/src/beamlime/__init__.py b/src/beamlime/__init__.py index 28dd48a44..7c0aacc4a 100644 --- a/src/beamlime/__init__.py +++ b/src/beamlime/__init__.py @@ -13,7 +13,6 @@ from .core import ( CommonHandlerFactory, - ConfigSubscriber, Handler, Message, MessageKey, @@ -29,7 +28,6 @@ __all__ = [ "CommonHandlerFactory", - "ConfigSubscriber", "Handler", "LiveWorkflow", "Message", diff --git a/src/beamlime/core/__init__.py b/src/beamlime/core/__init__.py index 4478d91ff..7abe0228f 100644 --- a/src/beamlime/core/__init__.py +++ b/src/beamlime/core/__init__.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 Scipp contributors (https://github.com/scipp) -from .config_subscriber import ConfigSubscriber from .handler import CommonHandlerFactory, Handler from .message import Message, MessageKey, MessageSink, MessageSource, compact_messages from .processor import Processor, StreamProcessor @@ -8,7 +7,6 @@ __all__ = [ 'CommonHandlerFactory', - 'ConfigSubscriber', 'Handler', 'Message', 'MessageKey', diff --git a/src/beamlime/core/config_subscriber.py b/src/beamlime/core/config_subscriber.py deleted file mode 100644 index 41f755189..000000000 --- a/src/beamlime/core/config_subscriber.py +++ /dev/null @@ -1,71 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) -import json -import logging -import threading -import time -from typing import Any - -from ..kafka.message_adapter import KafkaMessage -from ..kafka.source import KafkaConsumer - - -class ConfigSubscriber: - def __init__( - self, - *, - consumer: KafkaConsumer, - config: dict[str, Any] | None = None, - logger: logging.Logger | None = None, - ): - # Generate unique group id using service name and random suffix, to ensure all - # instances of the service get the same messages. - self._config = config or {} - self._logger = logger or logging.getLogger(__name__) - self._consumer = consumer - self._running = False - self._thread: threading.Thread | None = None - - def get(self, key: str, default=None): - return self._config.get(key, default) - - def start(self): - self._running = True - self._thread = threading.Thread(target=self._run_loop) - self._thread.start() - - def _process_message(self, message: KafkaMessage | None) -> None: - if message is None: - return - if message.error(): - self._logger.error('Consumer error: %s', message.error()) - return - key = message.key().decode('utf-8') - value = json.loads(message.value().decode('utf-8')) - self._logger.info( - 'Updating config: %s = %s at %s', key, value, message.timestamp() - ) - self._config[key] = value - - def _run_loop(self): - try: - while self._running: - # We previously relied on `consume` blocking unti the timeout, but this - # proved problematic in tests where fakes were in use, since it caused - # idle spinning consuming CPU. We now use a shorter timeout and sleep, - # even though this may never be a problem with a real Kafka consumer. - time.sleep(0.01) - messages = self._consumer.consume(num_messages=100, timeout=0.05) - for msg in messages: - self._process_message(msg) - except RuntimeError as e: - self._logger.exception("Error ConfigSubscriber loop failed: %s", e) - self.stop() - finally: - self._logger.info("ConfigSubscriber loop stopped") - - def stop(self): - self._running = False - if self._thread: - self._thread.join() - self._consumer.close() From 4923e48b202cfd50f0b4cc11200be0b0f9070b20 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 09:45:09 +0100 Subject: [PATCH 12/14] Docstring --- src/beamlime/kafka/source.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/beamlime/kafka/source.py b/src/beamlime/kafka/source.py index 7d23098c7..9754b9068 100644 --- a/src/beamlime/kafka/source.py +++ b/src/beamlime/kafka/source.py @@ -15,6 +15,14 @@ def close(self) -> None: class MultiConsumer: + """ + Message source for multiple Kafka consumers. + + This class allows for consuming messages from multiple Kafka consumers with + different configuration. In particular, we need to use different topic offsets for + data topics vs. config/command topics. + """ + def __init__(self, consumers): self._consumers = consumers From d0cbf0f1b14f42fc083558449931ed375266271a Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 11:56:30 +0100 Subject: [PATCH 13/14] Add missing file --- src/beamlime/handlers/config_handler.py | 62 +++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/beamlime/handlers/config_handler.py diff --git a/src/beamlime/handlers/config_handler.py b/src/beamlime/handlers/config_handler.py new file mode 100644 index 000000000..be43ca180 --- /dev/null +++ b/src/beamlime/handlers/config_handler.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +import json +import logging + +from ..core.handler import Config, Handler +from ..core.message import Message, MessageKey +from ..kafka.helpers import beamlime_command_topic + + +class ConfigHandler(Handler[bytes, None]): + """ + Handler for configuration messages. + + This handler processes configuration messages and updates the configuration + dictionary accordingly. It is used by StreamProcessor to handle configuration + updates received from a Kafka topic. + + Parameters + ---------- + logger + Logger to use + config + Configuration object to update + """ + + @staticmethod + def message_key(instrument: str) -> MessageKey: + return MessageKey(topic=beamlime_command_topic(instrument), source_name='') + + def __init__(self, *, logger: logging.Logger | None = None, config: Config): + super().__init__(logger=logger, config=config) + self._store = config + + def get(self, key: str, default=None): + return self._store.get(key, default) + + def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: + """ + Process configuration messages and update the configuration. + + Parameters + ---------- + messages: + List of messages containing configuration updates + + Returns + ------- + : + Empty list as this handler doesn't produce output messages + """ + for message in messages: + try: + key = message.value['key'] + value = json.loads(message.value['value'].decode('utf-8')) + self._logger.info( + 'Updating config: %s = %s at %s', key, value, message.timestamp + ) + self._store[key] = value + except Exception as e: # noqa: PERF203 + self._logger.error('Error processing config message: %s', e) + return [] From df7514a64c9f5674ce3cfd6680b225855f7e35a2 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 10:26:17 +0100 Subject: [PATCH 14/14] Use timestamp from Kafka --- src/beamlime/kafka/message_adapter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index 3ae2a6ebf..6aa1e77d5 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -201,9 +201,12 @@ class BeamlimeCommandsAdapter(MessageAdapter[KafkaMessage, Message[Any]]): def adapt(self, message: KafkaMessage) -> Message[Any]: key = MessageKey(topic=message.topic(), source_name='') + timestamp = message.timestamp()[1] legacy_key = message.key().decode('utf-8') # See 286 value = message.value() - return Message(key=key, timestamp=0, value={'key': legacy_key, 'value': value}) + return Message( + key=key, timestamp=timestamp, value={'key': legacy_key, 'value': value} + ) class ChainedAdapter(MessageAdapter[T, V]):