From 44243460284f74d1906c3a7c743e5da0bae21652 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 19 Mar 2025 09:34:04 +0100 Subject: [PATCH 01/20] Extract class WorkflowManager, prep for further refactor --- src/beamlime/config/raw_detectors/bifrost.py | 4 +- .../handlers/data_reduction_handler.py | 98 ++------------ src/beamlime/handlers/workflow_manager.py | 124 ++++++++++++++++++ 3 files changed, 135 insertions(+), 91 deletions(-) create mode 100644 src/beamlime/handlers/workflow_manager.py diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index 6b3a923f0..62d2a875f 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -100,13 +100,15 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_spectrum_view) _reduction_workflow.insert(_make_counts_per_angle) +context_keys = (DetectorRotation,) + def _make_processor(): return StreamProcessor( _reduction_workflow, dynamic_keys=(NeXusData[NXdetector, SampleRun],), accumulators=(SpectrumView, CountsPerAngle), - context_keys=(DetectorRotation,), + context_keys=context_keys, target_keys=(SpectrumView, CountsPerAngle), ) diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index 6157cc064..38558810e 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -7,19 +7,12 @@ from typing import Any import scipp as sc -from ess.reduce.streaming import StreamProcessor -from sciline.typing import Key -from ..core.handler import ( - Accumulator, - Config, - Handler, - HandlerFactory, - PeriodicAccumulatingHandler, -) +from ..core.handler import Config, Handler, HandlerFactory, PeriodicAccumulatingHandler from ..core.message import Message, MessageKey from .accumulators import DetectorEvents, ToNXevent_data from .to_nx_log import ToNXlog +from .workflow_manager import WorkflowManager class NullHandler(Handler[Any, None]): @@ -43,10 +36,7 @@ def __init__( ) -> None: self._logger = logger or logging.getLogger(__name__) self._config = config - self._instrument_config = instrument_config - self._processors = self._instrument_config.make_stream_processors() - self._source_to_key = self._instrument_config.source_to_key - self._attrs_registry = self._instrument_config.f144_attribute_registry + self._workflow_manager = WorkflowManager(instrument_config=instrument_config) def _is_nxlog(self, key: MessageKey) -> bool: return key.topic.split('_', maxsplit=1)[1] in ('motion',) @@ -55,16 +45,17 @@ def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: self._logger.info("Creating handler for %s", key) - wf_key = self._source_to_key.get(key.source_name) - if wf_key is None: + accumulator = self._workflow_manager.get_proxy_for_key(key.source_name) + if accumulator is None: self._logger.warning( "No workflow key found for source name %s, using null handler", key.source_name, ) return NullHandler(logger=self._logger, config=self._config) - if is_context := self._is_nxlog(key): - preprocessor = ToNXlog(attrs=self._attrs_registry[key.source_name]) + if self._is_nxlog(key): + attrs = self._workflow_manager.attrs_for_f144(key.source_name) + preprocessor = ToNXlog(attrs=attrs) else: preprocessor = ToNXevent_data() self._logger.info( @@ -73,82 +64,9 @@ def make_handler( key.source_name, ) - if (processor := self._processors.get(key.source_name)) is not None: - accumulator = StreamProcessorProxy(processor, key=wf_key) - self._logger.info( - "Source name %s is mapped to input %s of stream processor %s", - key.source_name, - wf_key, - key.source_name, - ) - else: - # Note the inefficiency here, of processing these sources in multiple - # workflows. This is typically once per detector. If monitors are large this - # can turn into a problem. At the same time, we want to keep flexible to - # allow for - # - # 1. Different workflows for different detector banks, e.g., for diffraction - # and SANS detectors. - # 2. Simple scaling, by processing different detectors on different nodes. - # - # Both could probably also be achieved with a non-duplicate processing of - # monitors, but we keep it simple until proven to be necessary. Note that - # an alternative would be to move some cost into the preprocessor, which - # could, e.g., histogram large monitors to reduce the duplicate cost in the - # stream processors. - accumulator = MultiplexingProxy( - list(self._processors.values()), key=wf_key, is_context=is_context - ) - self._logger.info( - "Source name %s is mapped to input %s in all stream processors", - key.source_name, - wf_key, - ) return PeriodicAccumulatingHandler( logger=self._logger, config=self._config, preprocessor=preprocessor, accumulators={f'reduced/{key.source_name}': accumulator}, ) - - -class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__( - self, stream_processors: list[StreamProcessor], key: Key, is_context: bool - ) -> None: - self._stream_processors = stream_processors - self._key = key - self._is_context = is_context - - def add(self, timestamp: int, data: sc.DataArray) -> None: - if self._is_context: - for stream_processor in self._stream_processors: - stream_processor.set_context({self._key: data}) - else: - for stream_processor in self._stream_processors: - stream_processor.accumulate({self._key: data}) - - def get(self) -> sc.DataGroup[sc.DataArray]: - return sc.DataGroup() - - def clear(self) -> None: - # Clearing would be ok, but should be redundant since the stream processors are - # cleared for each detector in the non-multiplexing proxies. - pass - - -class StreamProcessorProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__(self, processor: StreamProcessor, *, key: type) -> None: - self._processor = processor - self._key = key - - def add(self, timestamp: int, data: sc.DataArray) -> None: - self._processor.accumulate({self._key: data}) - - def get(self) -> sc.DataGroup[sc.DataArray]: - return sc.DataGroup( - {str(key): val for key, val in self._processor.finalize().items()} - ) - - def clear(self) -> None: - self._processor.clear() diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py new file mode 100644 index 000000000..5a2145466 --- /dev/null +++ b/src/beamlime/handlers/workflow_manager.py @@ -0,0 +1,124 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +from __future__ import annotations + +import logging +from types import ModuleType +from typing import Any + +import scipp as sc +from ess.reduce.streaming import StreamProcessor +from sciline.typing import Key + +from ..core.handler import Accumulator + + +class WorkflowManager: + def __init__( + self, + *, + instrument_config: ModuleType, + logger: logging.Logger | None = None, + ) -> None: + self._logger = logger or logging.getLogger(__name__) + self._instrument_config = instrument_config + self._source_to_key = instrument_config.source_to_key + self._processors = instrument_config.make_stream_processors() + self._attrs_registry = instrument_config.f144_attribute_registry + self._context_keys = instrument_config.context_keys + + def attrs_for_f144(self, source_name: str) -> dict[str, Any] | None: + """ + Get the attributes for a given source name. + + Parameters + ---------- + source_name: + The source name to get the attributes for. + Returns + ------- + : + The attributes for the given source name, or None if not found. + """ + return self._attrs_registry.get(source_name) + + def get_proxy_for_key( + self, source_name: str + ) -> MultiplexingProxy | StreamProcessorProxy | None: + wf_key = self._source_to_key.get(source_name) + if wf_key is None: + return None + is_context = wf_key in self._context_keys + if (processor := self._processors.get(source_name)) is not None: + self._logger.info( + "Source name %s is mapped to input %s of stream processor %s", + source_name, + wf_key, + source_name, + ) + return StreamProcessorProxy(processor, key=wf_key) + else: + # Note the inefficiency here, of processing these sources in multiple + # workflows. This is typically once per detector. If monitors are large this + # can turn into a problem. At the same time, we want to keep flexible to + # allow for + # + # 1. Different workflows for different detector banks, e.g., for diffraction + # and SANS detectors. + # 2. Simple scaling, by processing different detectors on different nodes. + # + # Both could probably also be achieved with a non-duplicate processing of + # monitors, but we keep it simple until proven to be necessary. Note that + # an alternative would be to move some cost into the preprocessor, which + # could, e.g., histogram large monitors to reduce the duplicate cost in the + # stream processors. + self._logger.info( + "Source name %s is mapped to input %s in all stream processors", + source_name, + wf_key, + ) + return MultiplexingProxy( + list(self._processors.values()), key=wf_key, is_context=is_context + ) + + +class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): + def __init__( + self, stream_processors: list[StreamProcessor], key: Key, is_context: bool + ) -> None: + self._stream_processors = stream_processors + self._key = key + self._is_context = is_context + + def add(self, timestamp: int, data: sc.DataArray) -> None: + if self._is_context: + for stream_processor in self._stream_processors: + stream_processor.set_context({self._key: data}) + else: + for stream_processor in self._stream_processors: + stream_processor.accumulate({self._key: data}) + + def get(self) -> sc.DataGroup[sc.DataArray]: + return sc.DataGroup() + + def clear(self) -> None: + # Clearing would be ok, but should be redundant since the stream processors are + # cleared for each detector in the non-multiplexing proxies. + pass + + +class StreamProcessorProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): + def __init__(self, processor: StreamProcessor, *, key: type) -> None: + self._processor = processor + self._key = key + + def add(self, timestamp: int, data: sc.DataArray) -> None: + self._processor.accumulate({self._key: data}) + + def get(self) -> sc.DataGroup[sc.DataArray]: + return sc.DataGroup( + {str(key): val for key, val in self._processor.finalize().items()} + ) + + def clear(self) -> None: + self._processor.clear() From f966c420e0a4efdc4139609ac6d409dce483413c Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 19 Mar 2025 09:35:12 +0100 Subject: [PATCH 02/20] Rename method --- src/beamlime/handlers/data_reduction_handler.py | 2 +- src/beamlime/handlers/workflow_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index 38558810e..a1b62b908 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -45,7 +45,7 @@ def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: self._logger.info("Creating handler for %s", key) - accumulator = self._workflow_manager.get_proxy_for_key(key.source_name) + accumulator = self._workflow_manager.get_accumulator(key.source_name) if accumulator is None: self._logger.warning( "No workflow key found for source name %s, using null handler", diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 5a2145466..2dd6947d4 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -42,7 +42,7 @@ def attrs_for_f144(self, source_name: str) -> dict[str, Any] | None: """ return self._attrs_registry.get(source_name) - def get_proxy_for_key( + def get_accumulator( self, source_name: str ) -> MultiplexingProxy | StreamProcessorProxy | None: wf_key = self._source_to_key.get(source_name) From 0725e0c3c05719f9688d8289e2bc5f5f97be2bd1 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 19 Mar 2025 09:38:31 +0100 Subject: [PATCH 03/20] Cleanup logging code --- .../handlers/data_reduction_handler.py | 3 +++ src/beamlime/handlers/workflow_manager.py | 26 +++++-------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index a1b62b908..b87ebb46b 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -63,6 +63,9 @@ def make_handler( preprocessor.__class__.__name__, key.source_name, ) + self._logger.info( + "Accumulator %s is used for source name %s", accumulator, key.source_name + ) return PeriodicAccumulatingHandler( logger=self._logger, diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 2dd6947d4..68e1cfc63 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -2,7 +2,6 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations -import logging from types import ModuleType from typing import Any @@ -14,13 +13,7 @@ class WorkflowManager: - def __init__( - self, - *, - instrument_config: ModuleType, - logger: logging.Logger | None = None, - ) -> None: - self._logger = logger or logging.getLogger(__name__) + def __init__(self, *, instrument_config: ModuleType) -> None: self._instrument_config = instrument_config self._source_to_key = instrument_config.source_to_key self._processors = instrument_config.make_stream_processors() @@ -50,12 +43,6 @@ def get_accumulator( return None is_context = wf_key in self._context_keys if (processor := self._processors.get(source_name)) is not None: - self._logger.info( - "Source name %s is mapped to input %s of stream processor %s", - source_name, - wf_key, - source_name, - ) return StreamProcessorProxy(processor, key=wf_key) else: # Note the inefficiency here, of processing these sources in multiple @@ -72,11 +59,6 @@ def get_accumulator( # an alternative would be to move some cost into the preprocessor, which # could, e.g., histogram large monitors to reduce the duplicate cost in the # stream processors. - self._logger.info( - "Source name %s is mapped to input %s in all stream processors", - source_name, - wf_key, - ) return MultiplexingProxy( list(self._processors.values()), key=wf_key, is_context=is_context ) @@ -90,6 +72,9 @@ def __init__( self._key = key self._is_context = is_context + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._key})" + def add(self, timestamp: int, data: sc.DataArray) -> None: if self._is_context: for stream_processor in self._stream_processors: @@ -112,6 +97,9 @@ def __init__(self, processor: StreamProcessor, *, key: type) -> None: self._processor = processor self._key = key + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._key})" + def add(self, timestamp: int, data: sc.DataArray) -> None: self._processor.accumulate({self._key: data}) From 37305e34c01490878e7cef8f0a71e54c6befc8a3 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 19 Mar 2025 09:45:39 +0100 Subject: [PATCH 04/20] Inject workflow manager --- src/beamlime/handlers/data_reduction_handler.py | 13 ++++--------- src/beamlime/services/data_reduction.py | 1 + 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index b87ebb46b..3d265eabb 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -from types import ModuleType from typing import Any import scipp as sc @@ -30,13 +29,13 @@ class ReductionHandlerFactory( def __init__( self, *, - instrument_config: ModuleType, + workflow_manager: WorkflowManager, logger: logging.Logger | None = None, config: Config, ) -> None: self._logger = logger or logging.getLogger(__name__) self._config = config - self._workflow_manager = WorkflowManager(instrument_config=instrument_config) + self._workflow_manager = workflow_manager def _is_nxlog(self, key: MessageKey) -> bool: return key.topic.split('_', maxsplit=1)[1] in ('motion',) @@ -59,13 +58,9 @@ def make_handler( else: preprocessor = ToNXevent_data() self._logger.info( - "Preprocessor %s is used for source name %s", - preprocessor.__class__.__name__, - key.source_name, - ) - self._logger.info( - "Accumulator %s is used for source name %s", accumulator, key.source_name + "%s using preprocessor %s", key.source_name, preprocessor.__class__.__name__ ) + self._logger.info("%s using accumulator %s", key.source_name, accumulator) return PeriodicAccumulatingHandler( logger=self._logger, diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index a67b717de..c4f964577 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -13,6 +13,7 @@ from beamlime.config.raw_detectors import get_config from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.data_reduction_handler import ReductionHandlerFactory +from beamlime.handlers.workflow_manager import WorkflowManager from beamlime.kafka import consumer as kafka_consumer from beamlime.kafka.helpers import ( beam_monitor_topic, From 80bc85327a61d8009ad443cdccb9913ca7c21786 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 21 Mar 2025 08:36:57 +0100 Subject: [PATCH 05/20] Add mechanism to add workflows late --- src/beamlime/config/raw_detectors/bifrost.py | 17 +- .../handlers/data_reduction_handler.py | 4 +- src/beamlime/handlers/workflow_manager.py | 148 +++++++++++++----- src/beamlime/services/data_reduction.py | 15 +- 4 files changed, 141 insertions(+), 43 deletions(-) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index 62d2a875f..510f227ce 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -18,6 +18,7 @@ from scippnexus import NXdetector from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename +from beamlime.handlers.workflow_manager import DynamicWorkflow def _to_flat_detector_view(da: sc.DataArray) -> sc.DataArray: @@ -100,15 +101,13 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_spectrum_view) _reduction_workflow.insert(_make_counts_per_angle) -context_keys = (DetectorRotation,) - def _make_processor(): return StreamProcessor( _reduction_workflow, dynamic_keys=(NeXusData[NXdetector, SampleRun],), accumulators=(SpectrumView, CountsPerAngle), - context_keys=context_keys, + context_keys=(DetectorRotation,), target_keys=(SpectrumView, CountsPerAngle), ) @@ -117,6 +116,18 @@ def make_stream_processors(): return {'unified_detector': _make_processor()} +def dynamic_workflows() -> dict[str, DynamicWorkflow]: + return { + 'unified_detector': DynamicWorkflow( + workflow=_reduction_workflow, + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + context_keys=(DetectorRotation,), + target_keys=(SpectrumView, CountsPerAngle), + accumulators=(SpectrumView, CountsPerAngle), + ) + } + + source_to_key = { 'unified_detector': NeXusData[NXdetector, SampleRun], 'detector_rotation': DetectorRotation, diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index 3d265eabb..cb3b714bb 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -30,12 +30,14 @@ def __init__( self, *, workflow_manager: WorkflowManager, + f144_attribute_registry: dict[str, dict[str, Any]], logger: logging.Logger | None = None, config: Config, ) -> None: self._logger = logger or logging.getLogger(__name__) self._config = config self._workflow_manager = workflow_manager + self._f144_attribute_registry = f144_attribute_registry def _is_nxlog(self, key: MessageKey) -> bool: return key.topic.split('_', maxsplit=1)[1] in ('motion',) @@ -53,7 +55,7 @@ def make_handler( return NullHandler(logger=self._logger, config=self._config) if self._is_nxlog(key): - attrs = self._workflow_manager.attrs_for_f144(key.source_name) + attrs = self._f144_attribute_registry[key.source_name] preprocessor = ToNXlog(attrs=attrs) else: preprocessor = ToNXevent_data() diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 68e1cfc63..b0b613b7b 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -2,9 +2,10 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations -from types import ModuleType -from typing import Any +from collections.abc import Callable, Iterator, MutableMapping, Sequence +from dataclasses import dataclass +import sciline import scipp as sc from ess.reduce.streaming import StreamProcessor from sciline.typing import Key @@ -12,28 +13,101 @@ from ..core.handler import Accumulator +# ... so we can recreate the StreamProcessor if workflow parameters change. +@dataclass +class DynamicWorkflow: + workflow: sciline.Pipeline + dynamic_keys: tuple[Key, ...] + target_keys: tuple[Key, ...] + accumulators: dict[Key, Accumulator | Callable[..., Accumulator]] | tuple[Key, ...] + context_keys: tuple[Key, ...] = () + + def make_stream_processor(self) -> StreamProcessor: + """Create a StreamProcessor from the workflow.""" + # TODO Need to copy accumulators if they are instances! + return StreamProcessor( + self.workflow, + dynamic_keys=self.dynamic_keys, + context_keys=self.context_keys, + target_keys=self.target_keys, + accumulators=self.accumulators, + ) + + +# Should have list of possible workflows, config which to use while running, for given +# bank? + +# 1. message -> create handler with proxy -> proxy without processor +# 2. set workflow parameters -> create new processor -> update proxy +# 2.b If we do this via Kafka, it can auto-recover after restart? + +# Logic problems if some keys are context in some but dynamic in others. +# Disallow that? +# make sets of dynamic keys and context keys, and check that they are disjoint. + + +class ProcessorRegistry(MutableMapping[str, StreamProcessor]): + def __init__(self) -> None: + self._processors: dict[str, StreamProcessor] = {} + + def __getitem__(self, key: str) -> StreamProcessor: + if key not in self._processors: + raise KeyError(f"Processor {key} not found") + return self._processors[key] + + def __setitem__(self, key: str, value: StreamProcessor) -> None: + self._processors[key] = value + + def __delitem__(self, key: str) -> None: + if key not in self._processors: + raise KeyError(f"Processor {key} not found") + del self._processors[key] + + def __iter__(self) -> Iterator[str]: + return iter(self._processors) + + def __len__(self) -> int: + return len(self._processors) + + class WorkflowManager: - def __init__(self, *, instrument_config: ModuleType) -> None: - self._instrument_config = instrument_config - self._source_to_key = instrument_config.source_to_key - self._processors = instrument_config.make_stream_processors() - self._attrs_registry = instrument_config.f144_attribute_registry - self._context_keys = instrument_config.context_keys - - def attrs_for_f144(self, source_name: str) -> dict[str, Any] | None: + def __init__( + self, + *, + workflow_names: Sequence[str], + source_to_key: dict[str, Key], + ) -> None: + # Why this init? We want the service to keep running, but be able to configure + # workflows after startup. + self._source_to_key = source_to_key + self._workflows: dict[str, DynamicWorkflow | None] = { + name: None for name in workflow_names + } + self._processors = ProcessorRegistry() + self._proxies: dict[str, StreamProcessorProxy] = {} + for name in workflow_names: + self.set_worklow(name, None) + + def set_worklow(self, name: str, workflow: DynamicWorkflow | None) -> None: """ - Get the attributes for a given source name. + Add a workflow to the manager. Parameters ---------- - source_name: - The source name to get the attributes for. - Returns - ------- - : - The attributes for the given source name, or None if not found. + name: + The name to identify the workflow. + workflow: + The workflow to add. """ - return self._attrs_registry.get(source_name) + if name not in self._workflows: + raise ValueError(f"Workflow {name} was not defined in the manager.") + self._workflows[name] = workflow + if workflow is None: + self._processors.pop(name, None) + else: + self._processors[name] = workflow.make_stream_processor() + if (proxy := self._proxies.get(name)) is not None: + proxy.set_processor(self._processors.get(name)) def get_accumulator( self, source_name: str @@ -41,9 +115,11 @@ def get_accumulator( wf_key = self._source_to_key.get(source_name) if wf_key is None: return None - is_context = wf_key in self._context_keys - if (processor := self._processors.get(source_name)) is not None: - return StreamProcessorProxy(processor, key=wf_key) + if source_name in self._workflows: + # Note that the processor may be 'None' at this point. + proxy = StreamProcessorProxy(self._processors.get(source_name), key=wf_key) + self._proxies[source_name] = proxy + return proxy else: # Note the inefficiency here, of processing these sources in multiple # workflows. This is typically once per detector. If monitors are large this @@ -59,28 +135,22 @@ def get_accumulator( # an alternative would be to move some cost into the preprocessor, which # could, e.g., histogram large monitors to reduce the duplicate cost in the # stream processors. - return MultiplexingProxy( - list(self._processors.values()), key=wf_key, is_context=is_context - ) + return MultiplexingProxy(self._processors, key=wf_key) class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__( - self, stream_processors: list[StreamProcessor], key: Key, is_context: bool - ) -> None: + def __init__(self, stream_processors: ProcessorRegistry, key: Key) -> None: self._stream_processors = stream_processors self._key = key - self._is_context = is_context def __repr__(self) -> str: return f"{self.__class__.__name__}({self._key})" def add(self, timestamp: int, data: sc.DataArray) -> None: - if self._is_context: - for stream_processor in self._stream_processors: + for stream_processor in self._stream_processors.values(): + if self._key in stream_processor._context_keys: stream_processor.set_context({self._key: data}) - else: - for stream_processor in self._stream_processors: + else: stream_processor.accumulate({self._key: data}) def get(self) -> sc.DataGroup[sc.DataArray]: @@ -93,20 +163,28 @@ def clear(self) -> None: class StreamProcessorProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__(self, processor: StreamProcessor, *, key: type) -> None: + def __init__(self, processor: StreamProcessor | None = None, *, key: type) -> None: self._processor = processor self._key = key def __repr__(self) -> str: return f"{self.__class__.__name__}({self._key})" + def set_processor(self, processor: StreamProcessor | None) -> None: + """Set the processor to use for this proxy.""" + self._processor = processor + def add(self, timestamp: int, data: sc.DataArray) -> None: - self._processor.accumulate({self._key: data}) + if self._processor is not None: + self._processor.accumulate({self._key: data}) def get(self) -> sc.DataGroup[sc.DataArray]: + if self._processor is None: + return sc.DataGroup() return sc.DataGroup( {str(key): val for key, val in self._processor.finalize().items()} ) def clear(self) -> None: - self._processor.clear() + if self._processor is not None: + self._processor.clear() diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index c4f964577..afe270812 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -68,10 +68,17 @@ def make_reduction_service_builder( beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), } ) - config = {} - config_handler = ConfigHandler(config=config) - handler_factory = ReductionHandlerFactory( - instrument_config=get_config(instrument), config=config + instrument_config = get_config(instrument) + workflow_manager = WorkflowManager( + source_to_key=instrument_config.source_to_key, + workflow_names=list(instrument_config.make_stream_processors()), + ) + for name, workflow in instrument_config.dynamic_workflows().items(): + workflow_manager.set_worklow(name, workflow) + handler_factory_cls = partial( + ReductionHandlerFactory, + workflow_manager=workflow_manager, + f144_attribute_registry=instrument_config.f144_attribute_registry, ) builder = DataServiceBuilder( instrument=instrument, From 58d9593ad323e783abe38aac1c61e24d9aaffa1c Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 21 Mar 2025 10:44:12 +0100 Subject: [PATCH 06/20] Cleanup --- src/beamlime/handlers/workflow_manager.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index b0b613b7b..3b6104924 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -72,10 +72,7 @@ def __len__(self) -> int: class WorkflowManager: def __init__( - self, - *, - workflow_names: Sequence[str], - source_to_key: dict[str, Key], + self, *, workflow_names: Sequence[str], source_to_key: dict[str, Key] ) -> None: # Why this init? We want the service to keep running, but be able to configure # workflows after startup. From ac72318d82a45341aa2677b9d3c9cc3998d29882 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 21 Mar 2025 11:54:15 +0100 Subject: [PATCH 07/20] Very ugly mechanism for changing workflow --- .../config/defaults/data_reduction.yaml | 3 +- src/beamlime/config/models.py | 11 ++++ .../handlers/data_reduction_handler.py | 5 ++ src/beamlime/handlers/workflow_manager.py | 51 ++++++++++++++++- src/beamlime/kafka/message_adapter.py | 1 + src/beamlime/services/dashboard.py | 55 +++++++++++++++++++ src/beamlime/services/data_reduction.py | 6 +- 7 files changed, 127 insertions(+), 5 deletions(-) diff --git a/src/beamlime/config/defaults/data_reduction.yaml b/src/beamlime/config/defaults/data_reduction.yaml index 3dc4b91a9..0ee721c0a 100644 --- a/src/beamlime/config/defaults/data_reduction.yaml +++ b/src/beamlime/config/defaults/data_reduction.yaml @@ -1,4 +1,5 @@ topics: - beam_monitor - detector - - motion \ No newline at end of file + - motion + - beamlime_commands \ No newline at end of file diff --git a/src/beamlime/config/models.py b/src/beamlime/config/models.py index dfd9db9de..b39e413e4 100644 --- a/src/beamlime/config/models.py +++ b/src/beamlime/config/models.py @@ -112,3 +112,14 @@ def validate_range(self) -> ROIAxisRange: class ROIRectangle(BaseModel): x: ROIAxisRange = Field(default_factory=ROIAxisRange) y: ROIAxisRange = Field(default_factory=ROIAxisRange) + + +class WorkflowControl(BaseModel): + source_name: str = Field( + default='', + description="Name of the source to control.", + ) + workflow_name: str = Field( + default='', + description="Name of the workflow to control.", + ) diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index cb3b714bb..70b837321 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -46,6 +46,11 @@ def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: self._logger.info("Creating handler for %s", key) + if key.topic.endswith('beamlime_commands'): + if key.source_name == 'workflow_control': + return self._workflow_manager.make_control_handler( + logger=self._logger, config=self._config + ) accumulator = self._workflow_manager.get_accumulator(key.source_name) if accumulator is None: self._logger.warning( diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 3b6104924..3e7b99203 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations +import logging from collections.abc import Callable, Iterator, MutableMapping, Sequence from dataclasses import dataclass @@ -10,7 +11,9 @@ from ess.reduce.streaming import StreamProcessor from sciline.typing import Key -from ..core.handler import Accumulator +from ..config.models import WorkflowControl +from ..core.handler import Accumulator, Config, Handler +from ..core.message import Message # ... so we can recreate the StreamProcessor if workflow parameters change. @@ -72,11 +75,16 @@ def __len__(self) -> int: class WorkflowManager: def __init__( - self, *, workflow_names: Sequence[str], source_to_key: dict[str, Key] + self, + *, + workflow_names: Sequence[str], + source_to_key: dict[str, Key], + dynamic_workflows: dict[str, DynamicWorkflow], ) -> None: # Why this init? We want the service to keep running, but be able to configure # workflows after startup. self._source_to_key = source_to_key + self._dynamic_workflows = dynamic_workflows self._workflows: dict[str, DynamicWorkflow | None] = { name: None for name in workflow_names } @@ -134,6 +142,45 @@ def get_accumulator( # stream processors. return MultiplexingProxy(self._processors, key=wf_key) + def make_control_handler( + self, *, logger: logging.Logger | None = None, config: Config + ) -> WorkflowControlHandler: + """ + Create a handler for workflow control messages. + + This handler is used to set the workflow manager's workflow parameters. + """ + return WorkflowControlHandler( + logger=logger, config=config, workflow_manager=self + ) + + +class WorkflowControlHandler(Handler[WorkflowControl, None]): + """ + Handler for workflow manager. + + This handler is used to set the workflow manager's workflow parameters. + """ + + def __init__( + self, + *, + logger: logging.Logger | None = None, + config: Config, + workflow_manager: WorkflowManager, + ) -> None: + super().__init__(logger=logger, config=config) + self._workflow_manager = workflow_manager + + def handle(self, messages: list[Message[WorkflowControl]]) -> list[Message[None]]: + for message in messages: + decoded = WorkflowControl.model_validate(message.value) + self._workflow_manager.set_worklow( + decoded.source_name, + self._workflow_manager._dynamic_workflows.get(decoded.workflow_name), + ) + return [] + class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): def __init__(self, stream_processors: ProcessorRegistry, key: Key) -> None: diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index 6aa1e77d5..894948229 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +import json from dataclasses import replace from typing import Any, Generic, Protocol, TypeVar diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 7dc1a40da..676d08d8a 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -231,6 +231,29 @@ def _setup_layout(self) -> None: style={'margin': '10px 0'}, ), html.Button('Clear', id='clear-button', n_clicks=0), + # Add workflow control section + html.Hr(style={'margin': '20px 0'}), + html.H3('Workflow Control', style={'marginTop': '10px'}), + html.Label('Source Name'), + dcc.Input( + id='workflow-source-name', + type='text', + placeholder='Enter source name', + style={'width': '100%', 'marginBottom': '10px'}, + ), + html.Label('Workflow Name'), + dcc.Input( + id='workflow-name', + type='text', + placeholder='Enter workflow name', + style={'width': '100%', 'marginBottom': '10px'}, + ), + html.Button( + 'Send Workflow Control', + id='workflow-control-button', + n_clicks=0, + style={'width': '100%', 'marginTop': '10px'}, + ), ] self._app.layout = html.Div( [ @@ -326,6 +349,14 @@ def _setup_callbacks(self) -> None: Input('use-weights-checkbox', 'value'), )(self.update_use_weights) + # Add callback for workflow control button + self._app.callback( + Output('workflow-control-button', 'n_clicks'), + Input('workflow-control-button', 'n_clicks'), + Input('workflow-source-name', 'value'), + Input('workflow-name', 'value'), + )(self.send_workflow_control) + def update_roi(self, x_center, x_delta, y_center, y_delta): x_min = max(0, x_center - x_delta) x_max = min(100, x_center + x_delta) @@ -510,6 +541,30 @@ def clear_data(self, n_clicks: int | None) -> int: self._config_service.update_config('start_time', model.model_dump()) return 0 + def send_workflow_control( + self, n_clicks: int | None, source_name: str | None, workflow_name: str | None + ) -> int: + """Send a workflow control message.""" + if n_clicks is None or n_clicks == 0 or not source_name or not workflow_name: + raise PreventUpdate + + self._logger.info( + "Sending workflow control message: source=%s, workflow=%s", + source_name, + workflow_name, + ) + + workflow_control = models.WorkflowControl( + source_name=source_name, + workflow_name=workflow_name, + ) + + self._config_service.update_config( + 'workflow_control', workflow_control.model_dump() + ) + + return 0 + def _start_impl(self) -> None: self._config_service_thread.start() diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index afe270812..ec838fb29 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -51,6 +51,9 @@ def setup_arg_parser() -> argparse.ArgumentParser: def make_reduction_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: + f144_adapter = ChainedAdapter( + first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() + ) adapter = RouteByTopicAdapter( routes={ beam_monitor_topic(instrument): ChainedAdapter( @@ -72,9 +75,8 @@ def make_reduction_service_builder( workflow_manager = WorkflowManager( source_to_key=instrument_config.source_to_key, workflow_names=list(instrument_config.make_stream_processors()), + dynamic_workflows=instrument_config.dynamic_workflows(), ) - for name, workflow in instrument_config.dynamic_workflows().items(): - workflow_manager.set_worklow(name, workflow) handler_factory_cls = partial( ReductionHandlerFactory, workflow_manager=workflow_manager, From 7cc8f86d7fd4e684bca29e03d0129637565b811e Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 10:56:57 +0100 Subject: [PATCH 08/20] Make it work again and cleanup --- .../config/defaults/data_reduction.yaml | 3 +- src/beamlime/config/models.py | 4 +- src/beamlime/config/raw_detectors/bifrost.py | 21 +--- .../handlers/data_reduction_handler.py | 5 - src/beamlime/handlers/workflow_manager.py | 106 ++++++------------ src/beamlime/services/data_reduction.py | 15 ++- 6 files changed, 50 insertions(+), 104 deletions(-) diff --git a/src/beamlime/config/defaults/data_reduction.yaml b/src/beamlime/config/defaults/data_reduction.yaml index 0ee721c0a..3dc4b91a9 100644 --- a/src/beamlime/config/defaults/data_reduction.yaml +++ b/src/beamlime/config/defaults/data_reduction.yaml @@ -1,5 +1,4 @@ topics: - beam_monitor - detector - - motion - - beamlime_commands \ No newline at end of file + - motion \ No newline at end of file diff --git a/src/beamlime/config/models.py b/src/beamlime/config/models.py index b39e413e4..df4a9e9b8 100644 --- a/src/beamlime/config/models.py +++ b/src/beamlime/config/models.py @@ -116,10 +116,8 @@ class ROIRectangle(BaseModel): class WorkflowControl(BaseModel): source_name: str = Field( - default='', description="Name of the source to control.", ) - workflow_name: str = Field( - default='', + workflow_name: str | None = Field( description="Name of the workflow to control.", ) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index 510f227ce..0ab4acb26 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -14,7 +14,6 @@ NeXusName, SampleRun, ) -from ess.reduce.streaming import StreamProcessor from scippnexus import NXdetector from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename @@ -86,7 +85,8 @@ def _make_counts_per_angle( sc.zeros(dims=['angle'], shape=[45], unit='counts'), coords={'angle': edges} ) counts = sc.values(data.sum().data) - da['angle', rotation.data[-1]] += counts + if rotation is not None: + da['angle', rotation.data[-1]] += counts return da @@ -102,23 +102,9 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_counts_per_angle) -def _make_processor(): - return StreamProcessor( - _reduction_workflow, - dynamic_keys=(NeXusData[NXdetector, SampleRun],), - accumulators=(SpectrumView, CountsPerAngle), - context_keys=(DetectorRotation,), - target_keys=(SpectrumView, CountsPerAngle), - ) - - -def make_stream_processors(): - return {'unified_detector': _make_processor()} - - def dynamic_workflows() -> dict[str, DynamicWorkflow]: return { - 'unified_detector': DynamicWorkflow( + 'testing': DynamicWorkflow( workflow=_reduction_workflow, dynamic_keys=(NeXusData[NXdetector, SampleRun],), context_keys=(DetectorRotation,), @@ -128,6 +114,7 @@ def dynamic_workflows() -> dict[str, DynamicWorkflow]: } +source_names = ('unified_detector',) source_to_key = { 'unified_detector': NeXusData[NXdetector, SampleRun], 'detector_rotation': DetectorRotation, diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index 70b837321..cb3b714bb 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -46,11 +46,6 @@ def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: self._logger.info("Creating handler for %s", key) - if key.topic.endswith('beamlime_commands'): - if key.source_name == 'workflow_control': - return self._workflow_manager.make_control_handler( - logger=self._logger, config=self._config - ) accumulator = self._workflow_manager.get_accumulator(key.source_name) if accumulator is None: self._logger.warning( diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 3e7b99203..5c8c4a2e0 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -2,9 +2,9 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations -import logging from collections.abc import Callable, Iterator, MutableMapping, Sequence from dataclasses import dataclass +from typing import Any import sciline import scipp as sc @@ -12,11 +12,9 @@ from sciline.typing import Key from ..config.models import WorkflowControl -from ..core.handler import Accumulator, Config, Handler -from ..core.message import Message +from ..core.handler import Accumulator -# ... so we can recreate the StreamProcessor if workflow parameters change. @dataclass class DynamicWorkflow: workflow: sciline.Pipeline @@ -37,18 +35,6 @@ def make_stream_processor(self) -> StreamProcessor: ) -# Should have list of possible workflows, config which to use while running, for given -# bank? - -# 1. message -> create handler with proxy -> proxy without processor -# 2. set workflow parameters -> create new processor -> update proxy -# 2.b If we do this via Kafka, it can auto-recover after restart? - -# Logic problems if some keys are context in some but dynamic in others. -# Disallow that? -# make sets of dynamic keys and context keys, and check that they are disjoint. - - class ProcessorRegistry(MutableMapping[str, StreamProcessor]): def __init__(self) -> None: self._processors: dict[str, StreamProcessor] = {} @@ -77,42 +63,59 @@ class WorkflowManager: def __init__( self, *, - workflow_names: Sequence[str], + source_names: Sequence[str], source_to_key: dict[str, Key], dynamic_workflows: dict[str, DynamicWorkflow], ) -> None: - # Why this init? We want the service to keep running, but be able to configure - # workflows after startup. + """ + Parameters + ---------- + source_names: + List of source names to attach workflows to. These need to be passed + explicitly, so we can distinguish source names that should not be handled + (to proxy and handler will be created) from source names that may later be + configured to use a workflow. + source_to_key: + Dictionary mapping source names to workflow input keys. + dynamic_workflows: + Dictionary mapping source names to dynamic workflows. + """ self._source_to_key = source_to_key self._dynamic_workflows = dynamic_workflows self._workflows: dict[str, DynamicWorkflow | None] = { - name: None for name in workflow_names + name: None for name in source_names } self._processors = ProcessorRegistry() self._proxies: dict[str, StreamProcessorProxy] = {} - for name in workflow_names: + for name in source_names: self.set_worklow(name, None) - def set_worklow(self, name: str, workflow: DynamicWorkflow | None) -> None: + def set_worklow(self, source_name: str, workflow: DynamicWorkflow | None) -> None: """ Add a workflow to the manager. Parameters ---------- - name: - The name to identify the workflow. + source_name: + Source name to attach the workflow to. workflow: - The workflow to add. + The workflow to attach to the source name. If None, the workflow is removed. """ - if name not in self._workflows: - raise ValueError(f"Workflow {name} was not defined in the manager.") - self._workflows[name] = workflow + if source_name not in self._workflows: + raise ValueError(f"Workflow {source_name} was not defined in the manager.") + self._workflows[source_name] = workflow if workflow is None: - self._processors.pop(name, None) + self._processors.pop(source_name, None) else: - self._processors[name] = workflow.make_stream_processor() - if (proxy := self._proxies.get(name)) is not None: - proxy.set_processor(self._processors.get(name)) + self._processors[source_name] = workflow.make_stream_processor() + if (proxy := self._proxies.get(source_name)) is not None: + proxy.set_processor(self._processors.get(source_name)) + + def set_workflow_from_command(self, command: Any) -> None: + decoded = WorkflowControl.model_validate(command) + self.set_worklow( + decoded.source_name, self._dynamic_workflows[decoded.workflow_name] + ) def get_accumulator( self, source_name: str @@ -142,45 +145,6 @@ def get_accumulator( # stream processors. return MultiplexingProxy(self._processors, key=wf_key) - def make_control_handler( - self, *, logger: logging.Logger | None = None, config: Config - ) -> WorkflowControlHandler: - """ - Create a handler for workflow control messages. - - This handler is used to set the workflow manager's workflow parameters. - """ - return WorkflowControlHandler( - logger=logger, config=config, workflow_manager=self - ) - - -class WorkflowControlHandler(Handler[WorkflowControl, None]): - """ - Handler for workflow manager. - - This handler is used to set the workflow manager's workflow parameters. - """ - - def __init__( - self, - *, - logger: logging.Logger | None = None, - config: Config, - workflow_manager: WorkflowManager, - ) -> None: - super().__init__(logger=logger, config=config) - self._workflow_manager = workflow_manager - - def handle(self, messages: list[Message[WorkflowControl]]) -> list[Message[None]]: - for message in messages: - decoded = WorkflowControl.model_validate(message.value) - self._workflow_manager.set_worklow( - decoded.source_name, - self._workflow_manager._dynamic_workflows.get(decoded.workflow_name), - ) - return [] - class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): def __init__(self, stream_processors: ProcessorRegistry, key: Key) -> None: diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index ec838fb29..04178dfc8 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -51,9 +51,6 @@ def setup_arg_parser() -> argparse.ArgumentParser: def make_reduction_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: - f144_adapter = ChainedAdapter( - first=KafkaToF144Adapter(), second=F144ToLogDataAdapter() - ) adapter = RouteByTopicAdapter( routes={ beam_monitor_topic(instrument): ChainedAdapter( @@ -73,12 +70,13 @@ def make_reduction_service_builder( ) instrument_config = get_config(instrument) workflow_manager = WorkflowManager( + source_names=instrument_config.source_names, source_to_key=instrument_config.source_to_key, - workflow_names=list(instrument_config.make_stream_processors()), dynamic_workflows=instrument_config.dynamic_workflows(), ) - handler_factory_cls = partial( - ReductionHandlerFactory, + config = {} + handler_factory = ReductionHandlerFactory( + config=config, workflow_manager=workflow_manager, f144_attribute_registry=instrument_config.f144_attribute_registry, ) @@ -89,6 +87,11 @@ def make_reduction_service_builder( adapter=adapter, handler_factory=handler_factory, ) + config_handler = ConfigHandler(config=config) + config_handler.register_action( + key='workflow_control', + callback=workflow_manager.set_workflow_from_command, + ) builder.add_handler(ConfigHandler.message_key(instrument), config_handler) return builder From b7a232f3458f0b21738f1c96148276e638ab0c84 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 10:58:27 +0100 Subject: [PATCH 09/20] Copy workflow to be on the safe side --- src/beamlime/config/raw_detectors/bifrost.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index 0ab4acb26..d205acfd3 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -105,7 +105,7 @@ def _make_counts_per_angle( def dynamic_workflows() -> dict[str, DynamicWorkflow]: return { 'testing': DynamicWorkflow( - workflow=_reduction_workflow, + workflow=_reduction_workflow.copy(), dynamic_keys=(NeXusData[NXdetector, SampleRun],), context_keys=(DetectorRotation,), target_keys=(SpectrumView, CountsPerAngle), From a302afbafc37d5bd967959c1311b205e9f3e4373 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 11:31:11 +0100 Subject: [PATCH 10/20] Cleanup --- src/beamlime/config/raw_detectors/bifrost.py | 22 +++--- src/beamlime/handlers/workflow_manager.py | 80 ++++++++++++-------- src/beamlime/services/data_reduction.py | 1 - 3 files changed, 59 insertions(+), 44 deletions(-) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index d205acfd3..aed43dd60 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -14,10 +14,11 @@ NeXusName, SampleRun, ) +from ess.reduce.streaming import StreamProcessor from scippnexus import NXdetector from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename -from beamlime.handlers.workflow_manager import DynamicWorkflow +from beamlime.handlers.workflow_manager import processor_factory def _to_flat_detector_view(da: sc.DataArray) -> sc.DataArray: @@ -102,16 +103,15 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_counts_per_angle) -def dynamic_workflows() -> dict[str, DynamicWorkflow]: - return { - 'testing': DynamicWorkflow( - workflow=_reduction_workflow.copy(), - dynamic_keys=(NeXusData[NXdetector, SampleRun],), - context_keys=(DetectorRotation,), - target_keys=(SpectrumView, CountsPerAngle), - accumulators=(SpectrumView, CountsPerAngle), - ) - } +@processor_factory.register(name='testing') +def _testing_processor() -> StreamProcessor: + return StreamProcessor( + _reduction_workflow.copy(), + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + context_keys=(DetectorRotation,), + target_keys=(SpectrumView, CountsPerAngle), + accumulators=(SpectrumView, CountsPerAngle), + ) source_names = ('unified_detector',) diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 5c8c4a2e0..8b18d7f8b 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -3,10 +3,9 @@ from __future__ import annotations from collections.abc import Callable, Iterator, MutableMapping, Sequence -from dataclasses import dataclass +from functools import wraps from typing import Any -import sciline import scipp as sc from ess.reduce.streaming import StreamProcessor from sciline.typing import Key @@ -15,24 +14,46 @@ from ..core.handler import Accumulator -@dataclass -class DynamicWorkflow: - workflow: sciline.Pipeline - dynamic_keys: tuple[Key, ...] - target_keys: tuple[Key, ...] - accumulators: dict[Key, Accumulator | Callable[..., Accumulator]] | tuple[Key, ...] - context_keys: tuple[Key, ...] = () - - def make_stream_processor(self) -> StreamProcessor: - """Create a StreamProcessor from the workflow.""" - # TODO Need to copy accumulators if they are instances! - return StreamProcessor( - self.workflow, - dynamic_keys=self.dynamic_keys, - context_keys=self.context_keys, - target_keys=self.target_keys, - accumulators=self.accumulators, - ) +class StreamProcessorFactory: + def __init__(self) -> None: + self._factories: dict[str, Callable[[], StreamProcessor]] = {} + + def register( + self, name: str + ) -> Callable[[Callable[[], StreamProcessor]], Callable[[], StreamProcessor]]: + """ + Decorator to register a factory function for creating StreamProcessors. + + Parameters + ---------- + name: + Name to register the factory under. + + Returns + ------- + Decorator function that registers the factory and returns it unchanged. + """ + + def decorator( + factory: Callable[[], StreamProcessor], + ) -> Callable[[], StreamProcessor]: + @wraps(factory) + def wrapper() -> StreamProcessor: + return factory() + + if name in self._factories: + raise ValueError(f"Factory for {name} already registered") + self._factories[name] = factory + return wrapper + + return decorator + + def create(self, name: str) -> StreamProcessor: + """Create a StreamProcessor using the registered factory.""" + return self._factories[name]() + + +processor_factory = StreamProcessorFactory() class ProcessorRegistry(MutableMapping[str, StreamProcessor]): @@ -65,7 +86,6 @@ def __init__( *, source_names: Sequence[str], source_to_key: dict[str, Key], - dynamic_workflows: dict[str, DynamicWorkflow], ) -> None: """ Parameters @@ -80,17 +100,14 @@ def __init__( dynamic_workflows: Dictionary mapping source names to dynamic workflows. """ + self._source_names = source_names self._source_to_key = source_to_key - self._dynamic_workflows = dynamic_workflows - self._workflows: dict[str, DynamicWorkflow | None] = { - name: None for name in source_names - } self._processors = ProcessorRegistry() self._proxies: dict[str, StreamProcessorProxy] = {} for name in source_names: self.set_worklow(name, None) - def set_worklow(self, source_name: str, workflow: DynamicWorkflow | None) -> None: + def set_worklow(self, source_name: str, processor: StreamProcessor | None) -> None: """ Add a workflow to the manager. @@ -101,20 +118,19 @@ def set_worklow(self, source_name: str, workflow: DynamicWorkflow | None) -> Non workflow: The workflow to attach to the source name. If None, the workflow is removed. """ - if source_name not in self._workflows: + if source_name not in self._source_names: raise ValueError(f"Workflow {source_name} was not defined in the manager.") - self._workflows[source_name] = workflow - if workflow is None: + if processor is None: self._processors.pop(source_name, None) else: - self._processors[source_name] = workflow.make_stream_processor() + self._processors[source_name] = processor if (proxy := self._proxies.get(source_name)) is not None: proxy.set_processor(self._processors.get(source_name)) def set_workflow_from_command(self, command: Any) -> None: decoded = WorkflowControl.model_validate(command) self.set_worklow( - decoded.source_name, self._dynamic_workflows[decoded.workflow_name] + decoded.source_name, processor_factory.create(decoded.workflow_name) ) def get_accumulator( @@ -123,7 +139,7 @@ def get_accumulator( wf_key = self._source_to_key.get(source_name) if wf_key is None: return None - if source_name in self._workflows: + if source_name in self._source_names: # Note that the processor may be 'None' at this point. proxy = StreamProcessorProxy(self._processors.get(source_name), key=wf_key) self._proxies[source_name] = proxy diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index 04178dfc8..d26a948a1 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -72,7 +72,6 @@ def make_reduction_service_builder( workflow_manager = WorkflowManager( source_names=instrument_config.source_names, source_to_key=instrument_config.source_to_key, - dynamic_workflows=instrument_config.dynamic_workflows(), ) config = {} handler_factory = ReductionHandlerFactory( From 5bb56a124c25d6318b5a575e7a88c8eb9423a8d2 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 11:57:38 +0100 Subject: [PATCH 11/20] Update ConfigHandler with action mechanism --- src/beamlime/handlers/config_handler.py | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/beamlime/handlers/config_handler.py b/src/beamlime/handlers/config_handler.py index be43ca180..d917691dd 100644 --- a/src/beamlime/handlers/config_handler.py +++ b/src/beamlime/handlers/config_handler.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) import json import logging +from typing import Any from ..core.handler import Config, Handler from ..core.message import Message, MessageKey @@ -31,10 +32,28 @@ def message_key(instrument: str) -> MessageKey: def __init__(self, *, logger: logging.Logger | None = None, config: Config): super().__init__(logger=logger, config=config) self._store = config + self._actions: dict[str, list[callable]] = {} def get(self, key: str, default=None): return self._store.get(key, default) + def register_action( + self, + key: str, + callback: callable, + ): + """ + Register an action to be called when a specific key is updated. + + Parameters + ---------- + key: + Key to watch for changes + callback: + Callback function to call when the key is updated + """ + self._actions.setdefault(key, []).append(callback) + def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: """ Process configuration messages and update the configuration. @@ -49,6 +68,7 @@ def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: : Empty list as this handler doesn't produce output messages """ + updated: dict[str, Any] = {} for message in messages: try: key = message.value['key'] @@ -57,6 +77,17 @@ def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: 'Updating config: %s = %s at %s', key, value, message.timestamp ) self._store[key] = value + updated[key] = value except Exception as e: # noqa: PERF203 self._logger.error('Error processing config message: %s', e) + # Delay action calls until all messages are processed to reduce triggering + # multiple calls for the same key in case of multiple messages with same key. + for key, value in updated.items(): + for action in self._actions.get(key, []): + try: + action(value) + except KeyError as e: # noqa: PERF203 + self._logger.error( + 'Error processing config action for %s: %s', key, e + ) return [] From c8829bd10752525160cbf14d89ba8f3c90d1b2d4 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 12:35:38 +0100 Subject: [PATCH 12/20] Improve dashboard UX for workflow selection --- src/beamlime/handlers/workflow_manager.py | 12 +++- src/beamlime/services/dashboard.py | 77 ++++++++++++++++++----- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 8b18d7f8b..b10e9bfdd 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -18,6 +18,10 @@ class StreamProcessorFactory: def __init__(self) -> None: self._factories: dict[str, Callable[[], StreamProcessor]] = {} + def get_available(self) -> tuple[str, ...]: + """Return a tuple of available factory names.""" + return tuple(self._factories.keys()) + def register( self, name: str ) -> Callable[[Callable[[], StreamProcessor]], Callable[[], StreamProcessor]]: @@ -129,9 +133,11 @@ def set_worklow(self, source_name: str, processor: StreamProcessor | None) -> No def set_workflow_from_command(self, command: Any) -> None: decoded = WorkflowControl.model_validate(command) - self.set_worklow( - decoded.source_name, processor_factory.create(decoded.workflow_name) - ) + if decoded.workflow_name is None: + processor = None + else: + processor = processor_factory.create(decoded.workflow_name) + self.set_worklow(decoded.source_name, processor) def get_accumulator( self, source_name: str diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 676d08d8a..457802ec9 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -14,8 +14,10 @@ from beamlime import Service, ServiceBase from beamlime.config import config_names, models from beamlime.config.config_loader import load_config +from beamlime.config.raw_detectors import get_config from beamlime.core.config_service import ConfigService from beamlime.core.message import compact_messages +from beamlime.handlers.workflow_manager import processor_factory from beamlime.kafka import consumer as kafka_consumer from beamlime.kafka.helpers import topic_for_instrument from beamlime.kafka.message_adapter import ( @@ -34,6 +36,7 @@ def __init__( instrument: str = 'dummy', debug: bool = False, log_level: int = logging.INFO, + auto_remove_plots_after_seconds: float = 10.0, ) -> None: name = f'{instrument}_dashboard' super().__init__(name=name, log_level=log_level) @@ -43,10 +46,15 @@ def __init__( # Initialize state self._plots: dict[str, go.Figure] = {} + self._auto_remove_plots_after_seconds = auto_remove_plots_after_seconds + self._last_plot_update: dict[str, float] = {} self._exit_stack = ExitStack() self._exit_stack.__enter__() + # Load instrument configuration for source names + self._instrument_config = get_config(instrument) + # Setup services self._setup_config_service() self._source = self._setup_kafka_consumer() @@ -231,25 +239,40 @@ def _setup_layout(self) -> None: style={'margin': '10px 0'}, ), html.Button('Clear', id='clear-button', n_clicks=0), - # Add workflow control section + # Add workflow control section with dropdowns html.Hr(style={'margin': '20px 0'}), html.H3('Workflow Control', style={'marginTop': '10px'}), html.Label('Source Name'), - dcc.Input( + dcc.Dropdown( id='workflow-source-name', - type='text', - placeholder='Enter source name', + options=[ + {'label': name, 'value': name} + for name in self._instrument_config.source_names + ], style={'width': '100%', 'marginBottom': '10px'}, ), - html.Label('Workflow Name'), - dcc.Input( - id='workflow-name', - type='text', - placeholder='Enter workflow name', - style={'width': '100%', 'marginBottom': '10px'}, + html.Div( + [ + dcc.Checklist( + id='workflow-enable', + options=[{'label': 'Enable workflow', 'value': 'enabled'}], + value=['enabled'], + style={'margin': '10px 0'}, + ), + html.Label('Workflow Name'), + dcc.Dropdown( + id='workflow-name', + options=[ + {'label': name, 'value': name} + for name in processor_factory.get_available() + ], + style={'width': '100%', 'marginBottom': '10px'}, + ), + ], + id='workflow-selector-container', ), html.Button( - 'Send Workflow Control', + 'Go!', id='workflow-control-button', n_clicks=0, style={'width': '100%', 'marginTop': '10px'}, @@ -349,12 +372,19 @@ def _setup_callbacks(self) -> None: Input('use-weights-checkbox', 'value'), )(self.update_use_weights) - # Add callback for workflow control button + # Add callback to enable/disable workflow dropdown + self._app.callback( + Output('workflow-name', 'disabled'), + Input('workflow-enable', 'value'), + )(lambda value: len(value) == 0) + + # Update workflow control button callback self._app.callback( Output('workflow-control-button', 'n_clicks'), Input('workflow-control-button', 'n_clicks'), Input('workflow-source-name', 'value'), Input('workflow-name', 'value'), + Input('workflow-enable', 'value'), )(self.send_workflow_control) def update_roi(self, x_center, x_delta, y_center, y_delta): @@ -492,6 +522,7 @@ def update_plots(self, n: int | None): if n is None: raise PreventUpdate + now = time.time() try: messages = self._source.get_messages() num = len(messages) @@ -517,11 +548,19 @@ def update_plots(self, n: int | None): fig.data[0].x = data.coords[x_dim].values fig.data[0].y = data.coords[y_dim].values fig.data[0].z = data.values + self._last_plot_update[key] = now except Exception as e: self._logger.exception("Error in update_plots: %s", e) raise PreventUpdate from None + # Remove plots if no recent update. This happens, e.g., when the reduction + # workflow is removed or changed. + for key, last_update in self._last_plot_update.items(): + if now - last_update > self._auto_remove_plots_after_seconds: + self._plots.pop(key, None) + self._logger.info("Removed plot for %s", key) + graphs = [dcc.Graph(figure=fig) for fig in self._plots.values()] return [html.Div(graphs, style={'display': 'flex', 'flexWrap': 'wrap'})] @@ -542,21 +581,27 @@ def clear_data(self, n_clicks: int | None) -> int: return 0 def send_workflow_control( - self, n_clicks: int | None, source_name: str | None, workflow_name: str | None + self, + n_clicks: int | None, + source_name: str | None, + workflow_name: str, + enable_workflow: list[str], ) -> int: """Send a workflow control message.""" - if n_clicks is None or n_clicks == 0 or not source_name or not workflow_name: + if n_clicks is None or n_clicks == 0 or not source_name: raise PreventUpdate + actual_workflow_name = workflow_name if enable_workflow else None + self._logger.info( "Sending workflow control message: source=%s, workflow=%s", source_name, - workflow_name, + actual_workflow_name, ) workflow_control = models.WorkflowControl( source_name=source_name, - workflow_name=workflow_name, + workflow_name=actual_workflow_name, ) self._config_service.update_config( From 70b3961dc2815cc8149692b420cbc848a8f582dd Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 12:50:24 +0100 Subject: [PATCH 13/20] Configure multiple bifrost workflows --- src/beamlime/config/raw_detectors/bifrost.py | 29 +++++++++++++++++--- src/beamlime/handlers/workflow_manager.py | 5 +++- src/beamlime/services/dashboard.py | 2 ++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index aed43dd60..3da270869 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -103,14 +103,35 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_counts_per_angle) -@processor_factory.register(name='testing') -def _testing_processor() -> StreamProcessor: +@processor_factory.register(name='spectrum-view') +def _spectrum_view() -> StreamProcessor: + return StreamProcessor( + _reduction_workflow.copy(), + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + target_keys=(SpectrumView,), + accumulators=(SpectrumView,), + ) + + +@processor_factory.register(name='counts-per-angle') +def _counts_per_angle() -> StreamProcessor: + return StreamProcessor( + _reduction_workflow.copy(), + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + context_keys=(DetectorRotation,), + target_keys=(CountsPerAngle,), + accumulators=(CountsPerAngle,), + ) + + +@processor_factory.register(name='all') +def _all() -> StreamProcessor: return StreamProcessor( _reduction_workflow.copy(), dynamic_keys=(NeXusData[NXdetector, SampleRun],), context_keys=(DetectorRotation,), - target_keys=(SpectrumView, CountsPerAngle), - accumulators=(SpectrumView, CountsPerAngle), + target_keys=(CountsPerAngle, SpectrumView), + accumulators=(CountsPerAngle, SpectrumView), ) diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index b10e9bfdd..7f36c1483 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -180,8 +180,11 @@ def add(self, timestamp: int, data: sc.DataArray) -> None: for stream_processor in self._stream_processors.values(): if self._key in stream_processor._context_keys: stream_processor.set_context({self._key: data}) - else: + elif self._key in stream_processor._dynamic_keys: stream_processor.accumulate({self._key: data}) + else: + # Might be unused by this particular workflow + pass def get(self) -> sc.DataGroup[sc.DataArray]: return sc.DataGroup() diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 457802ec9..0feda22e8 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -249,6 +249,7 @@ def _setup_layout(self) -> None: {'label': name, 'value': name} for name in self._instrument_config.source_names ], + value=self._instrument_config.source_names[0], style={'width': '100%', 'marginBottom': '10px'}, ), html.Div( @@ -266,6 +267,7 @@ def _setup_layout(self) -> None: {'label': name, 'value': name} for name in processor_factory.get_available() ], + value=processor_factory.get_available()[0], style={'width': '100%', 'marginBottom': '10px'}, ), ], From 42b19a58d611c7fe3d2d8663bc89cf4556d78089 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Mon, 24 Mar 2025 13:09:28 +0100 Subject: [PATCH 14/20] Add info in GUI --- src/beamlime/services/dashboard.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 0feda22e8..4ac681e4b 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -239,7 +239,6 @@ def _setup_layout(self) -> None: style={'margin': '10px 0'}, ), html.Button('Clear', id='clear-button', n_clicks=0), - # Add workflow control section with dropdowns html.Hr(style={'margin': '20px 0'}), html.H3('Workflow Control', style={'marginTop': '10px'}), html.Label('Source Name'), @@ -279,6 +278,7 @@ def _setup_layout(self) -> None: n_clicks=0, style={'width': '100%', 'marginTop': '10px'}, ), + html.Label('Note that workflow changes may take a few seconds to apply.'), ] self._app.layout = html.Div( [ From 9f27bfd143d8f1d849220077926ff626e75f612d Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 09:00:44 +0100 Subject: [PATCH 15/20] Handle monitors in data reduction service --- scripts/setup-kafka-topics.sh | 10 +++++ src/beamlime/config/raw_detectors/loki.py | 45 ++++++++++++++++++++++- src/beamlime/handlers/config_handler.py | 10 ++--- src/beamlime/services/data_reduction.py | 17 +++++++-- 4 files changed, 71 insertions(+), 11 deletions(-) diff --git a/scripts/setup-kafka-topics.sh b/scripts/setup-kafka-topics.sh index f25feea0e..df664a975 100644 --- a/scripts/setup-kafka-topics.sh +++ b/scripts/setup-kafka-topics.sh @@ -31,6 +31,16 @@ kafka-topics --create --bootstrap-server kafka:29092 \ --config segment.bytes=104857600 \ --config segment.ms=60000 +kafka-topics --create --bootstrap-server kafka:29092 \ + --topic ${BEAMLIME_INSTRUMENT}_motion \ + --config cleanup.policy=delete \ + --config delete.retention.ms=60000 \ + --config max.message.bytes=104857600 \ + --config retention.bytes=10737418240 \ + --config retention.ms=30000 \ + --config segment.bytes=104857600 \ + --config segment.ms=60000 + kafka-topics --create --bootstrap-server kafka:29092 \ --topic ${BEAMLIME_INSTRUMENT}_beamlime_commands \ --config cleanup.policy=compact \ diff --git a/src/beamlime/config/raw_detectors/loki.py b/src/beamlime/config/raw_detectors/loki.py index 23bd63aaf..edb9dd191 100644 --- a/src/beamlime/config/raw_detectors/loki.py +++ b/src/beamlime/config/raw_detectors/loki.py @@ -1,7 +1,13 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +import sciline +from ess.reduce.nexus.types import NeXusData, SampleRun +from ess.reduce.streaming import StreamProcessor +from scippnexus import NXdetector -_res_scale = 8 +from beamlime.handlers.workflow_manager import processor_factory + +_res_scale = 12 detectors_config = { 'detectors': { @@ -63,3 +69,38 @@ }, }, } + + +@processor_factory.register(name='I(Q)') +def _i_of_q() -> StreamProcessor: + return StreamProcessor( + sciline.Pipeline, + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + target_keys=(), + accumulators=(), + ) + + +source_names = ( + 'loki_detector_0', + 'loki_detector_1', + 'loki_detector_2', + 'loki_detector_3', + 'loki_detector_4', + 'loki_detector_5', + 'loki_detector_6', + 'loki_detector_7', + 'loki_detector_8', +) +source_to_key = { + 'loki_detector_0': NeXusData[NXdetector, SampleRun], + 'loki_detector_1': NeXusData[NXdetector, SampleRun], + 'loki_detector_2': NeXusData[NXdetector, SampleRun], + 'loki_detector_3': NeXusData[NXdetector, SampleRun], + 'loki_detector_4': NeXusData[NXdetector, SampleRun], + 'loki_detector_5': NeXusData[NXdetector, SampleRun], + 'loki_detector_6': NeXusData[NXdetector, SampleRun], + 'loki_detector_7': NeXusData[NXdetector, SampleRun], + 'loki_detector_8': NeXusData[NXdetector, SampleRun], +} +f144_attribute_registry = {} diff --git a/src/beamlime/handlers/config_handler.py b/src/beamlime/handlers/config_handler.py index d917691dd..bc66dc926 100644 --- a/src/beamlime/handlers/config_handler.py +++ b/src/beamlime/handlers/config_handler.py @@ -78,16 +78,16 @@ def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: ) self._store[key] = value updated[key] = value - except Exception as e: # noqa: PERF203 - self._logger.error('Error processing config message: %s', e) + except Exception: # noqa: PERF203 + self._logger.exception('Error processing config message:') # Delay action calls until all messages are processed to reduce triggering # multiple calls for the same key in case of multiple messages with same key. for key, value in updated.items(): for action in self._actions.get(key, []): try: action(value) - except KeyError as e: # noqa: PERF203 - self._logger.error( - 'Error processing config action for %s: %s', key, e + except KeyError: # noqa: PERF203 + self._logger.exception( + 'Error processing config action for %s:', key ) return [] diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index d26a948a1..101b9cc0f 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -24,12 +24,15 @@ from beamlime.kafka.message_adapter import ( BeamlimeCommandsAdapter, ChainedAdapter, + Da00ToScippAdapter, Ev44ToDetectorEventsAdapter, - Ev44ToMonitorEventsAdapter, F144ToLogDataAdapter, + KafkaToDa00Adapter, KafkaToEv44Adapter, KafkaToF144Adapter, + KafkaToMonitorEventsAdapter, RouteByTopicAdapter, + RoutingAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter from beamlime.kafka.source import MultiConsumer @@ -51,11 +54,17 @@ def setup_arg_parser() -> argparse.ArgumentParser: def make_reduction_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: - adapter = RouteByTopicAdapter( + monitors = RoutingAdapter( routes={ - beam_monitor_topic(instrument): ChainedAdapter( - first=KafkaToEv44Adapter(), second=Ev44ToMonitorEventsAdapter() + 'ev44': KafkaToMonitorEventsAdapter(), + 'da00': ChainedAdapter( + first=KafkaToDa00Adapter(), second=Da00ToScippAdapter() ), + } + ) + adapter = RouteByTopicAdapter( + routes={ + beam_monitor_topic(instrument): monitors, detector_topic(instrument): ChainedAdapter( first=KafkaToEv44Adapter(), second=Ev44ToDetectorEventsAdapter( From 588a4590e80f8664808e747c76cda2be9379e8d5 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 09:36:38 +0100 Subject: [PATCH 16/20] Fix monitor handling in reduction, working loki workflow --- src/beamlime/config/raw_detectors/loki.py | 26 +++++++++++++++---- .../handlers/data_reduction_handler.py | 6 +++++ src/beamlime/kafka/scipp_da00_compat.py | 8 +++++- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/beamlime/config/raw_detectors/loki.py b/src/beamlime/config/raw_detectors/loki.py index edb9dd191..997bea01a 100644 --- a/src/beamlime/config/raw_detectors/loki.py +++ b/src/beamlime/config/raw_detectors/loki.py @@ -1,8 +1,16 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) -import sciline +from ess.loki.live import _configured_Larmor_AgBeh_workflow from ess.reduce.nexus.types import NeXusData, SampleRun from ess.reduce.streaming import StreamProcessor +from ess.sans.types import ( + Denominator, + Incident, + IofQ, + Numerator, + ReducedQ, + Transmission, +) from scippnexus import NXdetector from beamlime.handlers.workflow_manager import processor_factory @@ -70,14 +78,20 @@ }, } +_workflow = _configured_Larmor_AgBeh_workflow() + @processor_factory.register(name='I(Q)') def _i_of_q() -> StreamProcessor: return StreamProcessor( - sciline.Pipeline, - dynamic_keys=(NeXusData[NXdetector, SampleRun],), - target_keys=(), - accumulators=(), + _workflow.copy(), + dynamic_keys=( + NeXusData[NXdetector, SampleRun], + NeXusData[Incident, SampleRun], + NeXusData[Transmission, SampleRun], + ), + target_keys=(IofQ[SampleRun],), + accumulators=(ReducedQ[SampleRun, Numerator], ReducedQ[SampleRun, Denominator]), ) @@ -102,5 +116,7 @@ def _i_of_q() -> StreamProcessor: 'loki_detector_6': NeXusData[NXdetector, SampleRun], 'loki_detector_7': NeXusData[NXdetector, SampleRun], 'loki_detector_8': NeXusData[NXdetector, SampleRun], + 'monitor1': NeXusData[Incident, SampleRun], + 'monitor2': NeXusData[Transmission, SampleRun], } f144_attribute_registry = {} diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index cb3b714bb..aba59ec27 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -10,6 +10,7 @@ from ..core.handler import Config, Handler, HandlerFactory, PeriodicAccumulatingHandler from ..core.message import Message, MessageKey from .accumulators import DetectorEvents, ToNXevent_data +from .monitor_data_handler import MonitorDataPreprocessor from .to_nx_log import ToNXlog from .workflow_manager import WorkflowManager @@ -42,6 +43,9 @@ def __init__( def _is_nxlog(self, key: MessageKey) -> bool: return key.topic.split('_', maxsplit=1)[1] in ('motion',) + def _is_monitor(self, key: MessageKey) -> bool: + return key.topic.split('_', maxsplit=1)[1] in ('beam_monitor',) + def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: @@ -57,6 +61,8 @@ def make_handler( if self._is_nxlog(key): attrs = self._f144_attribute_registry[key.source_name] preprocessor = ToNXlog(attrs=attrs) + elif self._is_monitor(key): + preprocessor = MonitorDataPreprocessor(config=self._config) else: preprocessor = ToNXevent_data() self._logger.info( diff --git a/src/beamlime/kafka/scipp_da00_compat.py b/src/beamlime/kafka/scipp_da00_compat.py index 454d59df5..eb7434855 100644 --- a/src/beamlime/kafka/scipp_da00_compat.py +++ b/src/beamlime/kafka/scipp_da00_compat.py @@ -14,7 +14,13 @@ def scipp_to_da00( _to_da00_variable(signal_name, sc.values(da.data)), _to_da00_variable('errors', sc.stddevs(da.data)), ] - variables.extend([_to_da00_variable(name, var) for name, var in da.coords.items()]) + variables.extend( + [ + _to_da00_variable(name, var) + for name, var in da.coords.items() + if var.shape == var.values.shape # vector3 etc. not supported currently + ] + ) return variables From a6b29730e6254918137b22b55dd2c2c7e462ba3c Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 10:23:59 +0100 Subject: [PATCH 17/20] Various fixes and improvements --- src/beamlime/config/raw_detectors/loki.py | 12 +++++++++--- src/beamlime/core/service.py | 4 ++++ src/beamlime/handlers/workflow_manager.py | 14 +++++++++++--- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/beamlime/config/raw_detectors/loki.py b/src/beamlime/config/raw_detectors/loki.py index 997bea01a..ad04ef459 100644 --- a/src/beamlime/config/raw_detectors/loki.py +++ b/src/beamlime/config/raw_detectors/loki.py @@ -1,10 +1,11 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from ess.loki.live import _configured_Larmor_AgBeh_workflow -from ess.reduce.nexus.types import NeXusData, SampleRun +from ess.reduce.nexus.types import NeXusData, NeXusDetectorName, SampleRun from ess.reduce.streaming import StreamProcessor from ess.sans.types import ( Denominator, + Filename, Incident, IofQ, Numerator, @@ -13,6 +14,7 @@ ) from scippnexus import NXdetector +from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename from beamlime.handlers.workflow_manager import processor_factory _res_scale = 12 @@ -82,9 +84,13 @@ @processor_factory.register(name='I(Q)') -def _i_of_q() -> StreamProcessor: +def _i_of_q(source_name: str) -> StreamProcessor: + wf = _workflow.copy() + wf[Filename[SampleRun]] = get_nexus_geometry_filename('loki') + wf[Filename[SampleRun]] = '/home/simon/instruments/loki/977695_00057856.hdf' + wf[NeXusDetectorName] = source_name return StreamProcessor( - _workflow.copy(), + wf, dynamic_keys=( NeXusData[NXdetector, SampleRun], NeXusData[Incident, SampleRun], diff --git a/src/beamlime/core/service.py b/src/beamlime/core/service.py index 53bfff687..3fc1d2bb6 100644 --- a/src/beamlime/core/service.py +++ b/src/beamlime/core/service.py @@ -33,6 +33,10 @@ def configure_logging(log_level: int) -> None: format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout, ) + # scipp.transform_coords logs info messages that are not useful and would show + # with every workflow call + scipp_logger = logging.getLogger('scipp') + scipp_logger.setLevel(logging.WARNING) def _setup_logging(self, log_level: int) -> None: """Configure logging for this service instance if not already configured""" diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py index 7f36c1483..064a8a100 100644 --- a/src/beamlime/handlers/workflow_manager.py +++ b/src/beamlime/handlers/workflow_manager.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from __future__ import annotations +import inspect from collections.abc import Callable, Iterator, MutableMapping, Sequence from functools import wraps from typing import Any @@ -52,9 +53,14 @@ def wrapper() -> StreamProcessor: return decorator - def create(self, name: str) -> StreamProcessor: + def create(self, *, workflow_name: str, source_name: str) -> StreamProcessor: """Create a StreamProcessor using the registered factory.""" - return self._factories[name]() + factory = self._factories[workflow_name] + sig = inspect.signature(factory) + if 'source_name' in sig.parameters: + return factory(source_name=source_name) + else: + return factory() processor_factory = StreamProcessorFactory() @@ -136,7 +142,9 @@ def set_workflow_from_command(self, command: Any) -> None: if decoded.workflow_name is None: processor = None else: - processor = processor_factory.create(decoded.workflow_name) + processor = processor_factory.create( + workflow_name=decoded.workflow_name, source_name=decoded.source_name + ) self.set_worklow(decoded.source_name, processor) def get_accumulator( From e3f185574a930612e9048d3f544f3e313eb1415d Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 12:13:13 +0100 Subject: [PATCH 18/20] Update config mechanism to allow for multiple reductions, one per det. --- src/beamlime/config/raw_detectors/loki.py | 1 - src/beamlime/handlers/detector_data_handler.py | 1 + src/beamlime/services/dashboard.py | 2 +- src/beamlime/services/data_reduction.py | 9 +++++---- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/beamlime/config/raw_detectors/loki.py b/src/beamlime/config/raw_detectors/loki.py index ad04ef459..c2ad9643b 100644 --- a/src/beamlime/config/raw_detectors/loki.py +++ b/src/beamlime/config/raw_detectors/loki.py @@ -87,7 +87,6 @@ def _i_of_q(source_name: str) -> StreamProcessor: wf = _workflow.copy() wf[Filename[SampleRun]] = get_nexus_geometry_filename('loki') - wf[Filename[SampleRun]] = '/home/simon/instruments/loki/977695_00057856.hdf' wf[NeXusDetectorName] = source_name return StreamProcessor( wf, diff --git a/src/beamlime/handlers/detector_data_handler.py b/src/beamlime/handlers/detector_data_handler.py index b034217a6..841e751d0 100644 --- a/src/beamlime/handlers/detector_data_handler.py +++ b/src/beamlime/handlers/detector_data_handler.py @@ -211,6 +211,7 @@ def clear(self) -> None: _registry = { 'geometry-dream-2025-01-01.nxs': 'md5:91aceb884943c76c0c21400ee74ad9b6', 'geometry-loki-2025-01-01.nxs': 'md5:8d0e103276934a20ba26bb525e53924a', + 'geometry-loki-2025-03-26.nxs': 'md5:279dc8cf7dae1fac030d724bc45a2572', 'geometry-bifrost-2025-01-01.nxs': 'md5:ae3caa99dd56de9495b9321eea4e4fef', } diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 4ac681e4b..7136b1dfc 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -607,7 +607,7 @@ def send_workflow_control( ) self._config_service.update_config( - 'workflow_control', workflow_control.model_dump() + f'{source_name}:workflow_control', workflow_control.model_dump() ) return 0 diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index 101b9cc0f..236126550 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -96,10 +96,11 @@ def make_reduction_service_builder( handler_factory=handler_factory, ) config_handler = ConfigHandler(config=config) - config_handler.register_action( - key='workflow_control', - callback=workflow_manager.set_workflow_from_command, - ) + for source_name in instrument_config.source_names: + config_handler.register_action( + key=f'{source_name}:workflow_control', + callback=workflow_manager.set_workflow_from_command, + ) builder.add_handler(ConfigHandler.message_key(instrument), config_handler) return builder From 73235a9c8e12b27b8d7267726fab280c7b0010e4 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 12:23:15 +0100 Subject: [PATCH 19/20] Add esssans as test dependency --- pyproject.toml | 4 ++ requirements/base.txt | 6 +- requirements/basetest.in | 1 + requirements/basetest.txt | 117 +++++++++++++++++++++++++++++++++++--- requirements/ci.txt | 4 +- requirements/dev.txt | 6 +- requirements/docs.txt | 82 +------------------------- requirements/nightly.in | 1 + requirements/nightly.txt | 117 +++++++++++++++++++++++++++++++++++--- requirements/static.txt | 4 +- 10 files changed, 232 insertions(+), 110 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ef08d3bfc..a17c71123 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dynamic = ["version"] [project.optional-dependencies] test = [ + "esssans", "essspectroscopy", "pytest", "pytest-benchmark", @@ -52,6 +53,9 @@ test = [ bifrost = [ "essspectroscopy", ] +loki = [ + "esssans", +] dashboard = [ "dash", "gunicorn", diff --git a/requirements/base.txt b/requirements/base.txt index 8d4754a67..12d66d806 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -74,7 +74,7 @@ packaging==24.2 # pooch pillow==11.1.0 # via matplotlib -platformdirs==4.3.6 +platformdirs==4.3.7 # via pooch plopp==25.3.0 # via scippneutron @@ -86,7 +86,7 @@ pydantic==2.10.6 # scippneutron pydantic-core==2.27.2 # via pydantic -pyparsing==3.2.1 +pyparsing==3.2.3 # via matplotlib python-dateutil==2.9.0.post0 # via @@ -117,7 +117,7 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil -typing-extensions==4.12.2 +typing-extensions==4.13.0 # via # pydantic # pydantic-core diff --git a/requirements/basetest.in b/requirements/basetest.in index ab108a5a9..a48d4ba10 100644 --- a/requirements/basetest.in +++ b/requirements/basetest.in @@ -9,6 +9,7 @@ pytest-xdist # will not be touched by ``make_base.py`` # --- END OF CUSTOM SECTION --- # The following was generated by 'tox -e deps', DO NOT EDIT MANUALLY! +esssans essspectroscopy pytest pytest-benchmark diff --git a/requirements/basetest.txt b/requirements/basetest.txt index 05395334d..77d8a8f20 100644 --- a/requirements/basetest.txt +++ b/requirements/basetest.txt @@ -1,4 +1,4 @@ -# SHA1:82658a80ffa9a59e7f724c250258e1f524276168 +# SHA1:06cf9b1a88938b3e0e42463590b5b897dc2ca243 # # This file is autogenerated by pip-compile-multi # To update, run: @@ -7,50 +7,92 @@ # annotated-types==0.7.0 # via pydantic +asttokens==3.0.0 + # via stack-data choppera==0.1.6 # via essspectroscopy +click==8.1.8 + # via dask +cloudpickle==3.1.1 + # via dask +comm==0.2.2 + # via ipywidgets contourpy==1.3.1 # via matplotlib -coverage[toml]==7.7.0 +coverage[toml]==7.7.1 # via pytest-cov cyclebane==24.10.0 # via sciline cycler==0.12.1 # via matplotlib +dask==2025.3.0 + # via esssans +decorator==5.2.1 + # via ipython dnspython==2.7.0 # via email-validator email-validator==2.2.0 # via scippneutron essreduce==25.3.1 - # via essspectroscopy + # via + # esssans + # essspectroscopy +esssans==25.2.0 + # via -r basetest.in essspectroscopy==0.25.2.0 # via -r basetest.in exceptiongroup==1.2.2 - # via pytest + # via + # ipython + # pytest execnet==2.1.1 # via pytest-xdist +executing==2.2.0 + # via stack-data fonttools==4.56.0 # via matplotlib +fsspec==2025.3.0 + # via dask +graphviz==0.20.3 + # via esssans h5py==3.13.0 # via # scippneutron # scippnexus idna==3.10 # via email-validator -iniconfig==2.0.0 +importlib-metadata==8.6.1 + # via dask +iniconfig==2.1.0 # via pytest +ipydatawidgets==4.3.5 + # via pythreejs +ipython==8.34.0 + # via ipywidgets +ipywidgets==8.1.5 + # via + # ipydatawidgets + # pythreejs +jedi==0.19.2 + # via ipython +jupyterlab-widgets==3.0.13 + # via ipywidgets kiwisolver==1.4.8 # via matplotlib lazy-loader==0.4 # via # plopp # scippneutron +locket==1.0.0 + # via partd loguru==0.7.3 # via essspectroscopy matplotlib==3.10.1 # via # mpltoolbox # plopp +matplotlib-inline==0.1.7 + # via ipython mpltoolbox==24.5.1 # via scippneutron networkx==3.4.2 @@ -59,33 +101,56 @@ numpy==2.2.4 # via # choppera # contourpy + # esssans # h5py + # ipydatawidgets # matplotlib # mpltoolbox + # pandas # polystar + # pythreejs # scipp # scippneutron # scipy packaging==24.2 # via + # dask # lazy-loader # matplotlib # pytest +pandas==2.2.3 + # via esssans +parso==0.8.4 + # via jedi +partd==1.4.2 + # via dask +pexpect==4.9.0 + # via ipython pillow==11.1.0 # via matplotlib plopp==25.3.0 - # via scippneutron + # via + # esssans + # scippneutron pluggy==1.5.0 # via pytest polystar==0.4.5 # via choppera +prompt-toolkit==3.0.50 + # via ipython +ptyprocess==0.7.0 + # via pexpect +pure-eval==0.2.3 + # via stack-data py-cpuinfo==9.0.0 # via pytest-benchmark pydantic==2.10.6 # via scippneutron pydantic-core==2.27.2 # via pydantic -pyparsing==3.2.1 +pygments==2.19.1 + # via ipython +pyparsing==3.2.3 # via matplotlib pytest==8.3.5 # via @@ -102,28 +167,39 @@ pytest-xdist==3.6.1 python-dateutil==2.9.0.post0 # via # matplotlib + # pandas # scippneutron # scippnexus # strictyaml +pythreejs==2.4.2 + # via esssans +pytz==2025.2 + # via pandas +pyyaml==6.0.2 + # via dask sciline==24.10.0 # via # essreduce + # esssans # essspectroscopy scipp==25.3.0 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron # scippnexus scippneutron==25.2.1 # via # essreduce + # esssans # essspectroscopy scippnexus==24.11.1 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron scipy==1.15.2 @@ -133,15 +209,40 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil +stack-data==0.6.3 + # via ipython strictyaml==1.7.3 # via choppera tomli==2.2.1 # via # coverage # pytest +toolz==1.0.0 + # via + # dask + # partd tqdm==4.67.1 # via essspectroscopy -typing-extensions==4.12.2 +traitlets==5.14.3 + # via + # comm + # ipython + # ipywidgets + # matplotlib-inline + # pythreejs + # traittypes +traittypes==0.2.1 + # via ipydatawidgets +typing-extensions==4.13.0 # via + # ipython # pydantic # pydantic-core +tzdata==2025.2 + # via pandas +wcwidth==0.2.13 + # via prompt-toolkit +widgetsnbextension==4.0.13 + # via ipywidgets +zipp==3.21.0 + # via importlib-metadata diff --git a/requirements/ci.txt b/requirements/ci.txt index 75b5ef5e0..87052409c 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -32,7 +32,7 @@ packaging==24.2 # -r ci.in # pyproject-api # tox -platformdirs==4.3.6 +platformdirs==4.3.7 # via # tox # virtualenv @@ -50,7 +50,7 @@ tomli==2.2.1 # tox tox==4.24.2 # via -r ci.in -typing-extensions==4.12.2 +typing-extensions==4.13.0 # via tox urllib3==2.3.0 # via requests diff --git a/requirements/dev.txt b/requirements/dev.txt index 709df96fa..dd77f2b5d 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -26,13 +26,9 @@ async-lru==2.0.5 # via jupyterlab cffi==1.17.1 # via argon2-cffi-bindings -click==8.1.8 - # via - # pip-compile-multi - # pip-tools copier==9.6.0 # via -r dev.in -dunamai==1.23.0 +dunamai==1.23.1 # via copier fqdn==1.5.1 # via jsonschema diff --git a/requirements/docs.txt b/requirements/docs.txt index bf1a48664..2de5fcb9d 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -11,8 +11,6 @@ accessible-pygments==0.0.5 # via pydata-sphinx-theme alabaster==1.0.0 # via sphinx -asttokens==3.0.0 - # via stack-data attrs==25.3.0 # via # jsonschema @@ -27,14 +25,8 @@ beautifulsoup4==4.13.3 # pydata-sphinx-theme bleach[css]==6.2.0 # via nbconvert -comm==0.2.2 - # via - # ipykernel - # ipywidgets debugpy==1.8.13 # via ipykernel -decorator==5.2.1 - # via ipython defusedxml==0.7.1 # via nbconvert docutils==0.21.2 @@ -43,31 +35,14 @@ docutils==0.21.2 # nbsphinx # pydata-sphinx-theme # sphinx -executing==2.2.0 - # via stack-data fastjsonschema==2.21.1 # via nbformat imagesize==1.4.1 # via sphinx -ipydatawidgets==4.3.5 - # via pythreejs ipykernel==6.29.5 # via -r docs.in ipympl==0.9.7 # via -r docs.in -ipython==8.34.0 - # via - # -r docs.in - # ipykernel - # ipympl - # ipywidgets -ipywidgets==8.1.5 - # via - # ipydatawidgets - # ipympl - # pythreejs -jedi==0.19.2 - # via ipython jsonschema==4.23.0 # via nbformat jsonschema-specifications==2024.10.1 @@ -85,21 +60,15 @@ jupyter-core==5.7.2 # nbformat jupyterlab-pygments==0.3.0 # via nbconvert -jupyterlab-widgets==3.0.13 - # via ipywidgets markdown-it-py==3.0.0 # via # mdit-py-plugins # myst-parser -matplotlib-inline==0.1.7 - # via - # ipykernel - # ipython mdit-py-plugins==0.4.2 # via myst-parser mdurl==0.1.2 # via markdown-it-py -mistune==3.1.2 +mistune==3.1.3 # via nbconvert myst-parser==4.0.1 # via -r docs.in @@ -116,39 +85,16 @@ nbsphinx==0.9.7 # via -r docs.in nest-asyncio==1.6.0 # via ipykernel -pandas==2.2.3 - # via -r docs.in pandocfilters==1.5.1 # via nbconvert -parso==0.8.4 - # via jedi -pexpect==4.9.0 - # via ipython -prompt-toolkit==3.0.50 - # via ipython psutil==7.0.0 # via # -r docs.in # ipykernel -ptyprocess==0.7.0 - # via pexpect -pure-eval==0.2.3 - # via stack-data pyarrow==19.0.1 # via -r docs.in pydata-sphinx-theme==0.16.1 # via -r docs.in -pygments==2.19.1 - # via - # accessible-pygments - # ipython - # nbconvert - # pydata-sphinx-theme - # sphinx -pythreejs==2.4.2 - # via -r docs.in -pytz==2025.1 - # via pandas pyzmq==26.3.0 # via # ipykernel @@ -192,39 +138,13 @@ sphinxcontrib-qthelp==2.0.0 # via sphinx sphinxcontrib-serializinghtml==2.0.0 # via sphinx -stack-data==0.6.3 - # via ipython tinycss2==1.4.0 # via bleach tornado==6.4.2 # via # ipykernel # jupyter-client -traitlets==5.14.3 - # via - # comm - # ipykernel - # ipympl - # ipython - # ipywidgets - # jupyter-client - # jupyter-core - # matplotlib-inline - # nbclient - # nbconvert - # nbformat - # nbsphinx - # pythreejs - # traittypes -traittypes==0.2.1 - # via ipydatawidgets -tzdata==2025.1 - # via pandas -wcwidth==0.2.13 - # via prompt-toolkit webencodings==0.5.1 # via # bleach # tinycss2 -widgetsnbextension==4.0.13 - # via ipywidgets diff --git a/requirements/nightly.in b/requirements/nightly.in index 66f1513c3..598b5fc34 100644 --- a/requirements/nightly.in +++ b/requirements/nightly.in @@ -7,6 +7,7 @@ ess-streaming-data-types jinja2 pydantic>=2 pooch +esssans essspectroscopy pytest pytest-benchmark diff --git a/requirements/nightly.txt b/requirements/nightly.txt index 852cac488..0b652ace2 100644 --- a/requirements/nightly.txt +++ b/requirements/nightly.txt @@ -1,4 +1,4 @@ -# SHA1:cd9178b35731b463833d75aae36658f14d320af0 +# SHA1:d8119ed2adf68d0fe7bb0a0f93fd8e7ab05ebecb # # This file is autogenerated by pip-compile-multi # To update, run: @@ -10,12 +10,20 @@ annotated-types==0.7.0 # via pydantic +asttokens==3.0.0 + # via stack-data certifi==2025.1.31 # via requests charset-normalizer==3.4.1 # via requests choppera==0.1.6 # via essspectroscopy +click==8.1.8 + # via dask +cloudpickle==3.1.1 + # via dask +comm==0.2.2 + # via ipywidgets confluent-kafka==2.8.2 # via -r nightly.in contourpy==1.3.1 @@ -24,6 +32,10 @@ cyclebane==24.10.0 # via sciline cycler==0.12.1 # via matplotlib +dask==2025.3.0 + # via esssans +decorator==5.2.1 + # via ipython dnspython==2.7.0 # via email-validator email-validator==2.2.0 @@ -33,15 +45,26 @@ ess-streaming-data-types==0.27.0 essreduce @ git+https://github.com/scipp/essreduce@main # via # -r nightly.in + # esssans # essspectroscopy +esssans==25.2.0 + # via -r nightly.in essspectroscopy==0.25.2.0 # via -r nightly.in exceptiongroup==1.2.2 - # via pytest + # via + # ipython + # pytest +executing==2.2.0 + # via stack-data flatbuffers==25.2.10 # via ess-streaming-data-types fonttools==4.56.0 # via matplotlib +fsspec==2025.3.0 + # via dask +graphviz==0.20.3 + # via esssans h5py==3.13.0 # via # scippneutron @@ -50,16 +73,32 @@ idna==3.10 # via # email-validator # requests -iniconfig==2.0.0 +importlib-metadata==8.6.1 + # via dask +iniconfig==2.1.0 # via pytest +ipydatawidgets==4.3.5 + # via pythreejs +ipython==8.34.0 + # via ipywidgets +ipywidgets==8.1.5 + # via + # ipydatawidgets + # pythreejs +jedi==0.19.2 + # via ipython jinja2==3.1.6 # via -r nightly.in +jupyterlab-widgets==3.0.13 + # via ipywidgets kiwisolver==1.4.8 # via matplotlib lazy-loader==0.4 # via # plopp # scippneutron +locket==1.0.0 + # via partd loguru==0.7.3 # via essspectroscopy markupsafe==3.0.2 @@ -68,6 +107,8 @@ matplotlib==3.10.1 # via # mpltoolbox # plopp +matplotlib-inline==0.1.7 + # via ipython mpltoolbox==24.5.1 # via scippneutron networkx==3.4.2 @@ -77,26 +118,40 @@ numpy==2.2.4 # choppera # contourpy # ess-streaming-data-types + # esssans # h5py + # ipydatawidgets # matplotlib # mpltoolbox + # pandas # polystar + # pythreejs # scipp # scippneutron # scipy packaging==24.2 # via + # dask # lazy-loader # matplotlib # pooch # pytest +pandas==2.2.3 + # via esssans +parso==0.8.4 + # via jedi +partd==1.4.2 + # via dask +pexpect==4.9.0 + # via ipython pillow==11.1.0 # via matplotlib -platformdirs==4.3.6 +platformdirs==4.3.7 # via pooch plopp @ git+https://github.com/scipp/plopp@main # via # -r nightly.in + # esssans # scippneutron pluggy==1.5.0 # via pytest @@ -104,15 +159,23 @@ polystar==0.4.5 # via choppera pooch==1.8.2 # via -r nightly.in +prompt-toolkit==3.0.50 + # via ipython +ptyprocess==0.7.0 + # via pexpect +pure-eval==0.2.3 + # via stack-data py-cpuinfo==9.0.0 # via pytest-benchmark -pydantic==2.11.0b1 +pydantic==2.11.0b2 # via # -r nightly.in # scippneutron -pydantic-core==2.31.1 +pydantic-core==2.32.0 # via pydantic -pyparsing==3.2.1 +pygments==2.19.1 + # via ipython +pyparsing==3.2.3 # via matplotlib pytest==8.3.5 # via @@ -123,34 +186,45 @@ pytest-benchmark==5.1.0 python-dateutil==2.9.0.post0 # via # matplotlib + # pandas # scippneutron # scippnexus # strictyaml +pythreejs==2.4.2 + # via esssans +pytz==2025.2 + # via pandas pyyaml==6.0.2 - # via -r nightly.in + # via + # -r nightly.in + # dask requests==2.32.3 # via pooch sciline @ git+https://github.com/scipp/sciline@main # via # -r nightly.in # essreduce + # esssans # essspectroscopy scipp==100.0.0.dev0 # via # -r nightly.in # choppera # essreduce + # esssans # essspectroscopy # scippneutron # scippnexus scippneutron==25.2.1 # via # essreduce + # esssans # essspectroscopy scippnexus==24.11.1 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron scipy==1.15.2 @@ -160,18 +234,43 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil +stack-data==0.6.3 + # via ipython strictyaml==1.7.3 # via choppera tomli==2.2.1 # via pytest +toolz==1.0.0 + # via + # dask + # partd tqdm==4.67.1 # via essspectroscopy -typing-extensions==4.12.2 +traitlets==5.14.3 + # via + # comm + # ipython + # ipywidgets + # matplotlib-inline + # pythreejs + # traittypes +traittypes==0.2.1 + # via ipydatawidgets +typing-extensions==4.13.0 # via + # ipython # pydantic # pydantic-core # typing-inspection typing-inspection==0.4.0 # via pydantic +tzdata==2025.2 + # via pandas urllib3==2.3.0 # via requests +wcwidth==0.2.13 + # via prompt-toolkit +widgetsnbextension==4.0.13 + # via ipywidgets +zipp==3.21.0 + # via importlib-metadata diff --git a/requirements/static.txt b/requirements/static.txt index 9d1ea209a..bf84eda07 100644 --- a/requirements/static.txt +++ b/requirements/static.txt @@ -15,9 +15,9 @@ identify==2.6.9 # via pre-commit nodeenv==1.9.1 # via pre-commit -platformdirs==4.3.6 +platformdirs==4.3.7 # via virtualenv -pre-commit==4.1.0 +pre-commit==4.2.0 # via -r static.in pyyaml==6.0.2 # via pre-commit From 9ca263169af1def2cbb8c8702ab94212c3fa0669 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 26 Mar 2025 12:24:28 +0100 Subject: [PATCH 20/20] unused import --- src/beamlime/kafka/message_adapter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index 894948229..6ff4ade4c 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) -import json +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from dataclasses import replace from typing import Any, Generic, Protocol, TypeVar