diff --git a/src/beamlime/__init__.py b/src/beamlime/__init__.py index 28dd48a44..7c0aacc4a 100644 --- a/src/beamlime/__init__.py +++ b/src/beamlime/__init__.py @@ -13,7 +13,6 @@ from .core import ( CommonHandlerFactory, - ConfigSubscriber, Handler, Message, MessageKey, @@ -29,7 +28,6 @@ __all__ = [ "CommonHandlerFactory", - "ConfigSubscriber", "Handler", "LiveWorkflow", "Message", diff --git a/src/beamlime/core/__init__.py b/src/beamlime/core/__init__.py index 4478d91ff..7abe0228f 100644 --- a/src/beamlime/core/__init__.py +++ b/src/beamlime/core/__init__.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 Scipp contributors (https://github.com/scipp) -from .config_subscriber import ConfigSubscriber from .handler import CommonHandlerFactory, Handler from .message import Message, MessageKey, MessageSink, MessageSource, compact_messages from .processor import Processor, StreamProcessor @@ -8,7 +7,6 @@ __all__ = [ 'CommonHandlerFactory', - 'ConfigSubscriber', 'Handler', 'Message', 'MessageKey', diff --git a/src/beamlime/core/config_subscriber.py b/src/beamlime/core/config_subscriber.py deleted file mode 100644 index 41f755189..000000000 --- a/src/beamlime/core/config_subscriber.py +++ /dev/null @@ -1,71 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) -import json -import logging -import threading -import time -from typing import Any - -from ..kafka.message_adapter import KafkaMessage -from ..kafka.source import KafkaConsumer - - -class ConfigSubscriber: - def __init__( - self, - *, - consumer: KafkaConsumer, - config: dict[str, Any] | None = None, - logger: logging.Logger | None = None, - ): - # Generate unique group id using service name and random suffix, to ensure all - # instances of the service get the same messages. - self._config = config or {} - self._logger = logger or logging.getLogger(__name__) - self._consumer = consumer - self._running = False - self._thread: threading.Thread | None = None - - def get(self, key: str, default=None): - return self._config.get(key, default) - - def start(self): - self._running = True - self._thread = threading.Thread(target=self._run_loop) - self._thread.start() - - def _process_message(self, message: KafkaMessage | None) -> None: - if message is None: - return - if message.error(): - self._logger.error('Consumer error: %s', message.error()) - return - key = message.key().decode('utf-8') - value = json.loads(message.value().decode('utf-8')) - self._logger.info( - 'Updating config: %s = %s at %s', key, value, message.timestamp() - ) - self._config[key] = value - - def _run_loop(self): - try: - while self._running: - # We previously relied on `consume` blocking unti the timeout, but this - # proved problematic in tests where fakes were in use, since it caused - # idle spinning consuming CPU. We now use a shorter timeout and sleep, - # even though this may never be a problem with a real Kafka consumer. - time.sleep(0.01) - messages = self._consumer.consume(num_messages=100, timeout=0.05) - for msg in messages: - self._process_message(msg) - except RuntimeError as e: - self._logger.exception("Error ConfigSubscriber loop failed: %s", e) - self.stop() - finally: - self._logger.info("ConfigSubscriber loop stopped") - - def stop(self): - self._running = False - if self._thread: - self._thread.join() - self._consumer.close() diff --git a/src/beamlime/core/handler.py b/src/beamlime/core/handler.py index 6c976c024..93660809d 100644 --- a/src/beamlime/core/handler.py +++ b/src/beamlime/core/handler.py @@ -89,6 +89,9 @@ def __init__(self, *, factory: HandlerFactory[Tin, Tout]): def __len__(self) -> int: return sum(1 for handler in self._handlers.values() if handler is not None) + def register_handler(self, key: MessageKey, handler: Handler[Tin, Tout]) -> None: + self._handlers[key] = handler + def get(self, key: MessageKey) -> Handler[Tin, Tout] | None: if key not in self._handlers: self._handlers[key] = self._factory.make_handler(key) diff --git a/src/beamlime/core/processor.py b/src/beamlime/core/processor.py index 255f39509..82f6a9a96 100644 --- a/src/beamlime/core/processor.py +++ b/src/beamlime/core/processor.py @@ -1,12 +1,12 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations import logging from collections import defaultdict from typing import Generic, Protocol -from .handler import HandlerFactory, HandlerRegistry +from .handler import HandlerRegistry from .message import MessageSink, MessageSource, Tin, Tout @@ -32,12 +32,12 @@ def __init__( logger: logging.Logger | None = None, source: MessageSource[Tin], sink: MessageSink[Tout], - handler_factory: HandlerFactory[Tin, Tout], + handler_registry: HandlerRegistry[Tin, Tout], ) -> None: self._logger = logger or logging.getLogger(__name__) self._source = source self._sink = sink - self._handler_registry = HandlerRegistry(factory=handler_factory) + self._handler_registry = handler_registry def process(self) -> None: messages = self._source.get_messages() diff --git a/src/beamlime/core/service.py b/src/beamlime/core/service.py index f3f1e1c3f..53bfff687 100644 --- a/src/beamlime/core/service.py +++ b/src/beamlime/core/service.py @@ -100,7 +100,6 @@ class Service(ServiceBase): def __init__( self, *, - children: list[StartStoppable] | None = None, processor: Processor, name: str | None = None, log_level: int = logging.INFO, @@ -108,14 +107,11 @@ def __init__( ): super().__init__(name=name, log_level=log_level) self._poll_interval = poll_interval - self._children = children or [] self._processor = processor self._thread: threading.Thread | None = None def _start_impl(self) -> None: """Start the service and block until stopped""" - for child in self._children: - child.start() self._thread = threading.Thread(target=self._run_loop) self._thread.start() @@ -147,8 +143,6 @@ def _stop_impl(self) -> None: """Stop the service gracefully""" if self._thread: self._thread.join() - for child in self._children: - child.stop() @staticmethod def setup_arg_parser(description: str) -> argparse.ArgumentParser: diff --git a/src/beamlime/handlers/config_handler.py b/src/beamlime/handlers/config_handler.py new file mode 100644 index 000000000..be43ca180 --- /dev/null +++ b/src/beamlime/handlers/config_handler.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +import json +import logging + +from ..core.handler import Config, Handler +from ..core.message import Message, MessageKey +from ..kafka.helpers import beamlime_command_topic + + +class ConfigHandler(Handler[bytes, None]): + """ + Handler for configuration messages. + + This handler processes configuration messages and updates the configuration + dictionary accordingly. It is used by StreamProcessor to handle configuration + updates received from a Kafka topic. + + Parameters + ---------- + logger + Logger to use + config + Configuration object to update + """ + + @staticmethod + def message_key(instrument: str) -> MessageKey: + return MessageKey(topic=beamlime_command_topic(instrument), source_name='') + + def __init__(self, *, logger: logging.Logger | None = None, config: Config): + super().__init__(logger=logger, config=config) + self._store = config + + def get(self, key: str, default=None): + return self._store.get(key, default) + + def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: + """ + Process configuration messages and update the configuration. + + Parameters + ---------- + messages: + List of messages containing configuration updates + + Returns + ------- + : + Empty list as this handler doesn't produce output messages + """ + for message in messages: + try: + key = message.value['key'] + value = json.loads(message.value['value'].decode('utf-8')) + self._logger.info( + 'Updating config: %s = %s at %s', key, value, message.timestamp + ) + self._store[key] = value + except Exception as e: # noqa: PERF203 + self._logger.error('Error processing config message: %s', e) + return [] diff --git a/src/beamlime/kafka/helpers.py b/src/beamlime/kafka/helpers.py index 9bf001b9c..9fc871d62 100644 --- a/src/beamlime/kafka/helpers.py +++ b/src/beamlime/kafka/helpers.py @@ -41,6 +41,13 @@ def motion_topic(instrument: str) -> str: return topic_for_instrument(topic='motion', instrument=instrument) +def beamlime_command_topic(instrument: str) -> str: + """ + Return the topic name for the beamlime command data of an instrument. + """ + return topic_for_instrument(topic='beamlime_commands', instrument=instrument) + + def source_name(device: str, signal: str) -> str: """ Return the source name for a given device and signal. diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index e4381abb2..6aa1e77d5 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -69,6 +69,15 @@ def adapt(self, message: T) -> U: pass +class IdentityAdapter(MessageAdapter[T, T]): + """ + An adapter that does not change the message. + """ + + def adapt(self, message: T) -> T: + return message + + class KafkaToEv44Adapter( MessageAdapter[KafkaMessage, Message[eventdata_ev44.EventData]] ): @@ -185,6 +194,21 @@ def adapt( return replace(message, value=da00_to_scipp(message.value)) +class BeamlimeCommandsAdapter(MessageAdapter[KafkaMessage, Message[Any]]): + """ + Adapts a Kafka message to a Beamlime command message. + """ + + def adapt(self, message: KafkaMessage) -> Message[Any]: + key = MessageKey(topic=message.topic(), source_name='') + timestamp = message.timestamp()[1] + legacy_key = message.key().decode('utf-8') # See 286 + value = message.value() + return Message( + key=key, timestamp=timestamp, value={'key': legacy_key, 'value': value} + ) + + class ChainedAdapter(MessageAdapter[T, V]): """ Chains two adapters together. diff --git a/src/beamlime/kafka/scipp_da00_compat.py b/src/beamlime/kafka/scipp_da00_compat.py index 5c4c6ce76..454d59df5 100644 --- a/src/beamlime/kafka/scipp_da00_compat.py +++ b/src/beamlime/kafka/scipp_da00_compat.py @@ -29,6 +29,15 @@ def da00_to_scipp( def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable: + if var.dtype == sc.DType.datetime64: + timedelta = var - sc.epoch(unit=var.unit) + return dataarray_da00.Variable( + name=name, + data=timedelta.values, + axes=list(var.dims), + shape=var.shape, + unit=f'datetime64[{var.unit}]', + ) return dataarray_da00.Variable( name=name, data=var.values, @@ -39,4 +48,7 @@ def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable: def _to_scipp_variable(var: dataarray_da00.Variable) -> sc.Variable: + if var.unit.startswith('datetime64'): + unit = var.unit.split('[')[1].rstrip(']') + return sc.epoch(unit=unit) + sc.array(dims=var.axes, values=var.data, unit=unit) return sc.array(dims=var.axes, values=var.data, unit=var.unit) diff --git a/src/beamlime/kafka/source.py b/src/beamlime/kafka/source.py index f19d4cbee..9754b9068 100644 --- a/src/beamlime/kafka/source.py +++ b/src/beamlime/kafka/source.py @@ -14,6 +14,29 @@ def close(self) -> None: pass +class MultiConsumer: + """ + Message source for multiple Kafka consumers. + + This class allows for consuming messages from multiple Kafka consumers with + different configuration. In particular, we need to use different topic offsets for + data topics vs. config/command topics. + """ + + def __init__(self, consumers): + self._consumers = consumers + + def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: + messages = [] + for consumer in self._consumers: + messages.extend(consumer.consume(num_messages, timeout)) + return messages + + def close(self) -> None: + for consumer in self._consumers: + consumer.close() + + class KafkaMessageSource(MessageSource[KafkaMessage]): """ Message source for messages from Kafka. diff --git a/src/beamlime/service_factory.py b/src/beamlime/service_factory.py index d3c16e908..2f3f5ab91 100644 --- a/src/beamlime/service_factory.py +++ b/src/beamlime/service_factory.py @@ -3,13 +3,17 @@ from __future__ import annotations import logging -from collections.abc import Callable from typing import Generic, TypeVar -from .core import ConfigSubscriber, MessageSink, StreamProcessor -from .core.handler import Config, HandlerFactory +from .core import MessageSink, StreamProcessor +from .core.handler import HandlerFactory, HandlerRegistry +from .core.message import MessageKey, MessageSource from .core.service import Service -from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter +from .kafka.message_adapter import ( + AdaptingMessageSource, + IdentityAdapter, + MessageAdapter, +) from .kafka.source import KafkaConsumer, KafkaMessageSource Traw = TypeVar("Traw") @@ -24,31 +28,27 @@ def __init__( instrument: str, name: str, log_level: int = logging.INFO, - adapter: MessageAdapter[Traw, Tin], - handler_factory_cls: Callable[[Config], HandlerFactory[Tin, Tout]], + adapter: MessageAdapter[Traw, Tin] | None = None, + handler_factory: HandlerFactory[Tin, Tout], ): self._name = f'{instrument}_{name}' self._log_level = log_level - self._adapter = adapter - self._handler_factory_cls = handler_factory_cls + self._adapter = adapter or IdentityAdapter() + self._handler_registry = HandlerRegistry(factory=handler_factory) - def build( - self, - control_consumer: KafkaConsumer, - consumer: KafkaConsumer, - sink: MessageSink[Tout], + def add_handler(self, key: MessageKey, handler: HandlerFactory[Tin, Tout]) -> None: + """Add specific handler to use for given key, instead of using the factory.""" + self._handler_registry.register_handler(key=key, handler=handler) + + def from_consumer( + self, consumer: KafkaConsumer, sink: MessageSink[Tout] ) -> Service: - config_subscriber = ConfigSubscriber(consumer=control_consumer, config={}) + return self.from_source(source=KafkaMessageSource(consumer=consumer), sink=sink) + + def from_source(self, source: MessageSource, sink: MessageSink[Tout]) -> Service: processor = StreamProcessor( - source=AdaptingMessageSource( - source=KafkaMessageSource(consumer=consumer), adapter=self._adapter - ), + source=AdaptingMessageSource(source=source, adapter=self._adapter), sink=sink, - handler_factory=self._handler_factory_cls(config=config_subscriber), - ) - return Service( - children=[config_subscriber], - processor=processor, - name=self._name, - log_level=self._log_level, + handler_registry=self._handler_registry, ) + return Service(processor=processor, name=self._name, log_level=self._log_level) diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index 6f073ddaa..a67b717de 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -5,16 +5,23 @@ import argparse import logging from contextlib import ExitStack -from functools import partial from typing import Literal, NoReturn from beamlime import Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.config.raw_detectors import get_config +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.data_reduction_handler import ReductionHandlerFactory from beamlime.kafka import consumer as kafka_consumer -from beamlime.kafka.helpers import beam_monitor_topic, detector_topic, motion_topic +from beamlime.kafka.helpers import ( + beam_monitor_topic, + beamlime_command_topic, + detector_topic, + motion_topic, +) from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Ev44ToDetectorEventsAdapter, Ev44ToMonitorEventsAdapter, @@ -24,11 +31,10 @@ RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink -from ..config.raw_detectors import get_config - def setup_arg_parser() -> argparse.ArgumentParser: parser = Service.setup_arg_parser(description='Data Reduction Service') @@ -58,18 +64,23 @@ def make_reduction_service_builder( motion_topic(instrument): ChainedAdapter( first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() ), + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), } ) - handler_factory_cls = partial( - ReductionHandlerFactory, instrument_config=get_config(instrument) + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = ReductionHandlerFactory( + instrument_config=get_config(instrument), config=config ) - return DataServiceBuilder( + builder = DataServiceBuilder( instrument=instrument, name='data_reduction', log_level=log_level, adapter=adapter, - handler_factory_cls=handler_factory_cls, + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -95,7 +106,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -103,9 +114,8 @@ def run_service( group='data_reduction', ) ) - service = builder.build( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/src/beamlime/services/detector_data.py b/src/beamlime/services/detector_data.py index 3f0b72249..325b8b225 100644 --- a/src/beamlime/services/detector_data.py +++ b/src/beamlime/services/detector_data.py @@ -5,20 +5,24 @@ import argparse import logging from contextlib import ExitStack -from functools import partial from typing import Literal, NoReturn from beamlime import Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.detector_data_handler import DetectorHandlerFactory from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beamlime_command_topic, detector_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Ev44ToDetectorEventsAdapter, KafkaToEv44Adapter, + RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -34,21 +38,34 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser -def make_detector_service_builder( - *, instrument: str, log_level: int = logging.INFO -) -> DataServiceBuilder: - adapter = ChainedAdapter( +def make_detector_data_adapter(instrument: str) -> RouteByTopicAdapter: + detectors = ChainedAdapter( first=KafkaToEv44Adapter(), second=Ev44ToDetectorEventsAdapter(merge_detectors=instrument == 'bifrost'), ) - handler_factory_cls = partial(DetectorHandlerFactory, instrument=instrument) - return DataServiceBuilder( + return RouteByTopicAdapter( + routes={ + detector_topic(instrument): detectors, + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) + + +def make_detector_service_builder( + *, instrument: str, log_level: int = logging.INFO +) -> DataServiceBuilder: + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = DetectorHandlerFactory(instrument=instrument, config=config) + builder = DataServiceBuilder( instrument=instrument, name='detector_data', log_level=log_level, - adapter=adapter, - handler_factory_cls=handler_factory_cls, + adapter=make_detector_data_adapter(instrument=instrument), + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -74,7 +91,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -82,9 +99,8 @@ def run_service( group='detector_data', ) ) - service = builder.build( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/src/beamlime/services/fake_detectors.py b/src/beamlime/services/fake_detectors.py index db056427e..89b24f9e1 100644 --- a/src/beamlime/services/fake_detectors.py +++ b/src/beamlime/services/fake_detectors.py @@ -11,19 +11,13 @@ import scipp as sc from streaming_data_types import eventdata_ev44 -from beamlime import ( - Handler, - Message, - MessageKey, - MessageSource, - Service, - StreamProcessor, -) +from beamlime import Handler, Message, MessageKey, MessageSource, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import detector_topic from beamlime.kafka.sink import KafkaSink, SerializationError +from beamlime.service_factory import DataServiceBuilder # Configure detectors to fake for each instrument # Values as of January 2025. These may change if the detector configuration changes. @@ -173,16 +167,18 @@ def serialize_detector_events_to_ev44( def run_service(*, instrument: str, log_level: int = logging.INFO) -> NoReturn: - service_name = f'{instrument}_fake_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) - source = FakeDetectorSource(instrument=instrument) serializer = serialize_detector_events_to_ev44 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + builder = DataServiceBuilder( + instrument=instrument, + name='fake_producer', + log_level=log_level, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + service = builder.from_source( + source=FakeDetectorSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() diff --git a/src/beamlime/services/fake_logdata.py b/src/beamlime/services/fake_logdata.py index 1387c40f6..9c0d78f99 100644 --- a/src/beamlime/services/fake_logdata.py +++ b/src/beamlime/services/fake_logdata.py @@ -6,19 +6,13 @@ import numpy as np import scipp as sc -from beamlime import ( - Handler, - Message, - MessageKey, - MessageSource, - Service, - StreamProcessor, -) +from beamlime import Handler, Message, MessageKey, MessageSource, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import motion_topic from beamlime.kafka.sink import KafkaSink, serialize_dataarray_to_f144 +from beamlime.service_factory import DataServiceBuilder def _make_ramp(size: int) -> sc.DataArray: @@ -131,16 +125,18 @@ def handle(self, messages: list[Message[T]]) -> list[Message[T]]: def run_service(*, instrument: str, log_level: int = logging.INFO) -> NoReturn: - service_name = f'{instrument}_fake_f144_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) - source = FakeLogdataSource(instrument=instrument) serializer = serialize_dataarray_to_f144 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + builder = DataServiceBuilder( + instrument=instrument, + name='fake_f144_producer', + log_level=log_level, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + service = builder.from_source( + source=FakeLogdataSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() diff --git a/src/beamlime/services/fake_monitors.py b/src/beamlime/services/fake_monitors.py index 503575b33..cc3b827b6 100644 --- a/src/beamlime/services/fake_monitors.py +++ b/src/beamlime/services/fake_monitors.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) import logging import time from dataclasses import replace @@ -15,18 +15,18 @@ MessageKey, MessageSource, Service, - StreamProcessor, ) from beamlime.config import config_names from beamlime.config.config_loader import load_config from beamlime.core.handler import CommonHandlerFactory from beamlime.kafka.helpers import beam_monitor_topic -from beamlime.kafka.message_adapter import AdaptingMessageSource, MessageAdapter +from beamlime.kafka.message_adapter import MessageAdapter from beamlime.kafka.sink import ( KafkaSink, SerializationError, serialize_dataarray_to_da00, ) +from beamlime.service_factory import DataServiceBuilder class FakeMonitorSource(MessageSource[sc.Variable]): @@ -113,25 +113,26 @@ def serialize_variable_to_monitor_ev44(msg: Message[sc.Variable]) -> bytes: def run_service( *, instrument: str, mode: Literal['ev44', 'da00'], log_level: int = logging.INFO ) -> NoReturn: - service_name = f'{instrument}_fake_{mode}_producer' kafka_config = load_config(namespace=config_names.kafka_upstream) if mode == 'ev44': - source = FakeMonitorSource(instrument=instrument) + adapter = None serializer = serialize_variable_to_monitor_ev44 else: - source = AdaptingMessageSource( - source=FakeMonitorSource(instrument=instrument), - adapter=EventsToHistogramAdapter( - toa=sc.linspace('toa', 0, 71_000_000, num=100, unit='ns') - ), + adapter = EventsToHistogramAdapter( + toa=sc.linspace('toa', 0, 71_000_000, num=100, unit='ns') ) serializer = serialize_dataarray_to_da00 - processor = StreamProcessor( - source=source, - sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + builder = DataServiceBuilder( + instrument=instrument, + name=f'fake_{mode}_producer', + log_level=log_level, + adapter=adapter, handler_factory=CommonHandlerFactory(config={}, handler_cls=IdentityHandler), ) - service = Service(processor=processor, name=service_name, log_level=log_level) + service = builder.from_source( + source=FakeMonitorSource(instrument=instrument), + sink=KafkaSink(kafka_config=kafka_config, serializer=serializer), + ) service.start() diff --git a/src/beamlime/services/monitor_data.py b/src/beamlime/services/monitor_data.py index 0d622e9c6..32ed9de83 100644 --- a/src/beamlime/services/monitor_data.py +++ b/src/beamlime/services/monitor_data.py @@ -8,16 +8,21 @@ from beamlime import CommonHandlerFactory, Service from beamlime.config import config_names from beamlime.config.config_loader import load_config +from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.monitor_data_handler import create_monitor_data_handler from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beam_monitor_topic, beamlime_command_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, Da00ToScippAdapter, KafkaToDa00Adapter, KafkaToMonitorEventsAdapter, + RouteByTopicAdapter, RoutingAdapter, ) from beamlime.kafka.sink import KafkaSink +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -33,8 +38,8 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser -def make_monitor_data_adapter() -> RoutingAdapter: - return RoutingAdapter( +def make_monitor_data_adapter(instrument: str) -> RoutingAdapter: + monitors = RoutingAdapter( routes={ 'ev44': KafkaToMonitorEventsAdapter(), 'da00': ChainedAdapter( @@ -42,20 +47,31 @@ def make_monitor_data_adapter() -> RoutingAdapter: ), } ) + return RouteByTopicAdapter( + routes={ + beam_monitor_topic(instrument): monitors, + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) def make_monitor_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: - return DataServiceBuilder( + config = {} + config_handler = ConfigHandler(config=config) + handler_factory = CommonHandlerFactory( + handler_cls=create_monitor_data_handler, config=config + ) + builder = DataServiceBuilder( instrument=instrument, name='monitor_data', log_level=log_level, - adapter=make_monitor_data_adapter(), - handler_factory_cls=CommonHandlerFactory.from_handler( - create_monitor_data_handler - ), + adapter=make_monitor_data_adapter(instrument=instrument), + handler_factory=handler_factory, ) + builder.add_handler(ConfigHandler.message_key(instrument), config_handler) + return builder def run_service( @@ -80,7 +96,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=config['topics'], config={**consumer_config, **kafka_upstream_config}, @@ -88,9 +104,8 @@ def run_service( group='monitor_data', ) ) - service = builder.build( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/src/beamlime/services/timeseries.py b/src/beamlime/services/timeseries.py index f7c8dca36..62fa494c7 100644 --- a/src/beamlime/services/timeseries.py +++ b/src/beamlime/services/timeseries.py @@ -6,7 +6,6 @@ import logging from collections.abc import Mapping from contextlib import ExitStack -from functools import partial from typing import Any, Literal, NoReturn from beamlime import Service @@ -14,12 +13,16 @@ from beamlime.config.config_loader import load_config from beamlime.handlers.timeseries_handler import LogdataHandlerFactory from beamlime.kafka import consumer as kafka_consumer +from beamlime.kafka.helpers import beamlime_command_topic, motion_topic from beamlime.kafka.message_adapter import ( + BeamlimeCommandsAdapter, ChainedAdapter, F144ToLogDataAdapter, KafkaToF144Adapter, + RouteByTopicAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter +from beamlime.kafka.source import MultiConsumer from beamlime.service_factory import DataServiceBuilder from beamlime.sinks import PlotToPngSink @@ -35,24 +38,32 @@ def setup_arg_parser() -> argparse.ArgumentParser: return parser +def make_timeseries_adapter(instrument: str) -> RouteByTopicAdapter: + return RouteByTopicAdapter( + routes={ + motion_topic(instrument): ChainedAdapter( + first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() + ), + beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), + }, + ) + + def make_timeseries_service_builder( *, instrument: str, log_level: int = logging.INFO, attribute_registry: Mapping[str, Mapping[str, Any]] | None = None, ) -> DataServiceBuilder: - adapter = ChainedAdapter(first=KafkaToF144Adapter(), second=F144ToLogDataAdapter()) - handler_factory_cls = partial( - LogdataHandlerFactory, - instrument=instrument, - attribute_registry=attribute_registry, + handler_factory = LogdataHandlerFactory( + instrument=instrument, attribute_registry=attribute_registry, config={} ) return DataServiceBuilder( instrument=instrument, name='timeseries', log_level=log_level, - adapter=adapter, - handler_factory_cls=handler_factory_cls, + adapter=make_timeseries_adapter(instrument=instrument), + handler_factory=handler_factory, ) @@ -80,7 +91,7 @@ def run_service( control_consumer = stack.enter_context( kafka_consumer.make_control_consumer(instrument=instrument) ) - consumer = stack.enter_context( + data_consumer = stack.enter_context( kafka_consumer.make_consumer_from_config( topics=('motion',), config={**consumer_config, **kafka_upstream_config}, @@ -88,9 +99,8 @@ def run_service( group='timeseries', ) ) - service = builder.build( - control_consumer=control_consumer, consumer=consumer, sink=sink - ) + consumer = MultiConsumer([control_consumer, data_consumer]) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start() diff --git a/tests/processor_test.py b/tests/processor_test.py index f3676f93f..5e377f540 100644 --- a/tests/processor_test.py +++ b/tests/processor_test.py @@ -40,7 +40,8 @@ def test_consumes_and_produces_messages() -> None: ) sink = FakeMessageSink() factory = CommonHandlerFactory(config=config, handler_cls=ValueToStringHandler) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) processor.process() assert len(sink.messages) == 0 processor.process() @@ -107,7 +108,8 @@ def test_processor_skips_keys_without_handlers(): factory = SelectiveHandlerFactory( config=config, handler_cls=ValueToStringHandler, allowed_keys=['allowed'] ) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) processor.process() @@ -135,7 +137,8 @@ def test_processor_with_mixed_handlers(): factory = SelectiveHandlerFactory( config=config, handler_cls=ValueToStringHandler, allowed_keys=['allowed'] ) - processor = StreamProcessor(source=source, sink=sink, handler_factory=factory) + registry = HandlerRegistry(factory=factory) + processor = StreamProcessor(source=source, sink=sink, handler_registry=registry) # First batch: one allowed message processor.process() diff --git a/tests/services/data_service_test.py b/tests/services/data_service_test.py index f781384a7..24f328ccf 100644 --- a/tests/services/data_service_test.py +++ b/tests/services/data_service_test.py @@ -32,14 +32,6 @@ def handle(self, messages: list[Message[int]]) -> list[Message[int]]: return messages -class EmptyConsumer(KafkaConsumer): - def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: - return [] - - def close(self) -> None: - pass - - class IntConsumer(KafkaConsumer): def __init__(self) -> None: self._values = [11, 22, 33, 44] @@ -67,13 +59,11 @@ def test_basics() -> None: instrument='instrument', name='name', adapter=ForwardingAdapter(), - handler_factory_cls=CommonHandlerFactory.from_handler(ForwardingHandler), + handler_factory=CommonHandlerFactory(handler_cls=ForwardingHandler, config={}), ) sink = FakeMessageSink() consumer = IntConsumer() - service = builder.build( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) while not consumer.at_end: time.sleep(0.1) diff --git a/tests/services/detector_data_test.py b/tests/services/detector_data_test.py index 28a8eaa6c..b7eceef1b 100644 --- a/tests/services/detector_data_test.py +++ b/tests/services/detector_data_test.py @@ -9,7 +9,7 @@ from beamlime.config.raw_detectors import available_instruments, get_config from beamlime.fakes import FakeMessageSink -from beamlime.kafka.helpers import source_name +from beamlime.kafka.helpers import detector_topic, source_name from beamlime.kafka.message_adapter import FakeKafkaMessage, KafkaMessage from beamlime.kafka.sink import UnrollingSinkAdapter from beamlime.kafka.source import KafkaConsumer @@ -32,6 +32,7 @@ def __init__( events_per_message: int = 1_000, max_events: int = 1_000_000, ) -> None: + self._topic = detector_topic(instrument=instrument) self._detector_config = detector_config[instrument] self._events_per_message = events_per_message self._max_events = max_events @@ -98,7 +99,7 @@ def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: messages = [ FakeKafkaMessage( value=self._content[(self._current + msg) % len(self._content)], - topic="dummy", + topic=self._topic, timestamp=self._make_timestamp(), ) for msg in range(messages_to_produce) @@ -124,11 +125,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non # It is thus always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_detector_service_builder(instrument=instrument) - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( @@ -136,9 +133,7 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.build( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) benchmark(start_and_wait_for_completion, consumer=consumer) service.stop() @@ -148,20 +143,12 @@ def test_performance(benchmark, instrument: str, events_per_message: int) -> Non @pytest.mark.parametrize('instrument', available_instruments()) def test_detector_data_service(instrument: str) -> None: builder = make_detector_service_builder(instrument=instrument) - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( instrument=instrument, events_per_message=100, max_events=10_000 ) - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) service.stop() diff --git a/tests/services/monitor_data_test.py b/tests/services/monitor_data_test.py index 3d8d2ac93..f03d39d28 100644 --- a/tests/services/monitor_data_test.py +++ b/tests/services/monitor_data_test.py @@ -8,7 +8,7 @@ from streaming_data_types import eventdata_ev44 from beamlime.fakes import FakeMessageSink -from beamlime.kafka.helpers import source_name +from beamlime.kafka.helpers import beam_monitor_topic, source_name from beamlime.kafka.message_adapter import FakeKafkaMessage, KafkaMessage from beamlime.kafka.source import KafkaConsumer from beamlime.services.monitor_data import make_monitor_service_builder @@ -82,7 +82,7 @@ def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: messages = [ FakeKafkaMessage( value=self._content[msg % self._num_sources], - topic="dummy", + topic=beam_monitor_topic("dummy"), timestamp=self._make_timestamp(), ) for msg in range(num_messages) @@ -107,11 +107,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No # It is this always returning messages quickly, which shifts the balance in the # services to a different place than in reality. builder = make_monitor_service_builder(instrument='dummy') - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer( @@ -119,9 +115,7 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No events_per_message=events_per_message, max_events=50_000_000, ) - service = builder.build( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) benchmark(start_and_wait_for_completion, consumer=consumer) service.stop() @@ -129,17 +123,11 @@ def test_performance(benchmark, num_sources: int, events_per_message: int) -> No def test_monitor_data_service() -> None: builder = make_monitor_service_builder(instrument='dummy') - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=EmptyConsumer(), - sink=FakeMessageSink(), - ) + service = builder.from_consumer(consumer=EmptyConsumer(), sink=FakeMessageSink()) sink = FakeMessageSink() consumer = Ev44Consumer(num_sources=2, events_per_message=100, max_events=10_000) - service = builder.build( - control_consumer=EmptyConsumer(), consumer=consumer, sink=sink - ) + service = builder.from_consumer(consumer=consumer, sink=sink) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) source_names = [msg.key.source_name for msg in sink.messages] diff --git a/tests/services/timeseries_test.py b/tests/services/timeseries_test.py index decc30d89..2154c893c 100644 --- a/tests/services/timeseries_test.py +++ b/tests/services/timeseries_test.py @@ -18,14 +18,6 @@ from beamlime.services.timeseries import make_timeseries_service_builder -class EmptyConsumer(KafkaConsumer): - def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: - return [] - - def close(self) -> None: - pass - - class F144Consumer(KafkaConsumer): def __init__( self, @@ -164,11 +156,7 @@ def test_timeseries_service(instrument: str) -> None: source_names=source_names, ) - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer) @@ -219,11 +207,7 @@ def test_timeseries_accumulation() -> None: source_names=['detector_rotation'], # Use just one source for simplicity ) - service = builder.build( - control_consumer=EmptyConsumer(), - consumer=consumer, - sink=UnrollingSinkAdapter(sink), - ) + service = builder.from_consumer(consumer=consumer, sink=UnrollingSinkAdapter(sink)) service.start(blocking=False) start_and_wait_for_completion(consumer=consumer)