diff --git a/pyproject.toml b/pyproject.toml index ef08d3bfc..a17c71123 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dynamic = ["version"] [project.optional-dependencies] test = [ + "esssans", "essspectroscopy", "pytest", "pytest-benchmark", @@ -52,6 +53,9 @@ test = [ bifrost = [ "essspectroscopy", ] +loki = [ + "esssans", +] dashboard = [ "dash", "gunicorn", diff --git a/requirements/base.txt b/requirements/base.txt index 8d4754a67..12d66d806 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -74,7 +74,7 @@ packaging==24.2 # pooch pillow==11.1.0 # via matplotlib -platformdirs==4.3.6 +platformdirs==4.3.7 # via pooch plopp==25.3.0 # via scippneutron @@ -86,7 +86,7 @@ pydantic==2.10.6 # scippneutron pydantic-core==2.27.2 # via pydantic -pyparsing==3.2.1 +pyparsing==3.2.3 # via matplotlib python-dateutil==2.9.0.post0 # via @@ -117,7 +117,7 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil -typing-extensions==4.12.2 +typing-extensions==4.13.0 # via # pydantic # pydantic-core diff --git a/requirements/basetest.in b/requirements/basetest.in index ab108a5a9..a48d4ba10 100644 --- a/requirements/basetest.in +++ b/requirements/basetest.in @@ -9,6 +9,7 @@ pytest-xdist # will not be touched by ``make_base.py`` # --- END OF CUSTOM SECTION --- # The following was generated by 'tox -e deps', DO NOT EDIT MANUALLY! +esssans essspectroscopy pytest pytest-benchmark diff --git a/requirements/basetest.txt b/requirements/basetest.txt index 05395334d..77d8a8f20 100644 --- a/requirements/basetest.txt +++ b/requirements/basetest.txt @@ -1,4 +1,4 @@ -# SHA1:82658a80ffa9a59e7f724c250258e1f524276168 +# SHA1:06cf9b1a88938b3e0e42463590b5b897dc2ca243 # # This file is autogenerated by pip-compile-multi # To update, run: @@ -7,50 +7,92 @@ # annotated-types==0.7.0 # via pydantic +asttokens==3.0.0 + # via stack-data choppera==0.1.6 # via essspectroscopy +click==8.1.8 + # via dask +cloudpickle==3.1.1 + # via dask +comm==0.2.2 + # via ipywidgets contourpy==1.3.1 # via matplotlib -coverage[toml]==7.7.0 +coverage[toml]==7.7.1 # via pytest-cov cyclebane==24.10.0 # via sciline cycler==0.12.1 # via matplotlib +dask==2025.3.0 + # via esssans +decorator==5.2.1 + # via ipython dnspython==2.7.0 # via email-validator email-validator==2.2.0 # via scippneutron essreduce==25.3.1 - # via essspectroscopy + # via + # esssans + # essspectroscopy +esssans==25.2.0 + # via -r basetest.in essspectroscopy==0.25.2.0 # via -r basetest.in exceptiongroup==1.2.2 - # via pytest + # via + # ipython + # pytest execnet==2.1.1 # via pytest-xdist +executing==2.2.0 + # via stack-data fonttools==4.56.0 # via matplotlib +fsspec==2025.3.0 + # via dask +graphviz==0.20.3 + # via esssans h5py==3.13.0 # via # scippneutron # scippnexus idna==3.10 # via email-validator -iniconfig==2.0.0 +importlib-metadata==8.6.1 + # via dask +iniconfig==2.1.0 # via pytest +ipydatawidgets==4.3.5 + # via pythreejs +ipython==8.34.0 + # via ipywidgets +ipywidgets==8.1.5 + # via + # ipydatawidgets + # pythreejs +jedi==0.19.2 + # via ipython +jupyterlab-widgets==3.0.13 + # via ipywidgets kiwisolver==1.4.8 # via matplotlib lazy-loader==0.4 # via # plopp # scippneutron +locket==1.0.0 + # via partd loguru==0.7.3 # via essspectroscopy matplotlib==3.10.1 # via # mpltoolbox # plopp +matplotlib-inline==0.1.7 + # via ipython mpltoolbox==24.5.1 # via scippneutron networkx==3.4.2 @@ -59,33 +101,56 @@ numpy==2.2.4 # via # choppera # contourpy + # esssans # h5py + # ipydatawidgets # matplotlib # mpltoolbox + # pandas # polystar + # pythreejs # scipp # scippneutron # scipy packaging==24.2 # via + # dask # lazy-loader # matplotlib # pytest +pandas==2.2.3 + # via esssans +parso==0.8.4 + # via jedi +partd==1.4.2 + # via dask +pexpect==4.9.0 + # via ipython pillow==11.1.0 # via matplotlib plopp==25.3.0 - # via scippneutron + # via + # esssans + # scippneutron pluggy==1.5.0 # via pytest polystar==0.4.5 # via choppera +prompt-toolkit==3.0.50 + # via ipython +ptyprocess==0.7.0 + # via pexpect +pure-eval==0.2.3 + # via stack-data py-cpuinfo==9.0.0 # via pytest-benchmark pydantic==2.10.6 # via scippneutron pydantic-core==2.27.2 # via pydantic -pyparsing==3.2.1 +pygments==2.19.1 + # via ipython +pyparsing==3.2.3 # via matplotlib pytest==8.3.5 # via @@ -102,28 +167,39 @@ pytest-xdist==3.6.1 python-dateutil==2.9.0.post0 # via # matplotlib + # pandas # scippneutron # scippnexus # strictyaml +pythreejs==2.4.2 + # via esssans +pytz==2025.2 + # via pandas +pyyaml==6.0.2 + # via dask sciline==24.10.0 # via # essreduce + # esssans # essspectroscopy scipp==25.3.0 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron # scippnexus scippneutron==25.2.1 # via # essreduce + # esssans # essspectroscopy scippnexus==24.11.1 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron scipy==1.15.2 @@ -133,15 +209,40 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil +stack-data==0.6.3 + # via ipython strictyaml==1.7.3 # via choppera tomli==2.2.1 # via # coverage # pytest +toolz==1.0.0 + # via + # dask + # partd tqdm==4.67.1 # via essspectroscopy -typing-extensions==4.12.2 +traitlets==5.14.3 + # via + # comm + # ipython + # ipywidgets + # matplotlib-inline + # pythreejs + # traittypes +traittypes==0.2.1 + # via ipydatawidgets +typing-extensions==4.13.0 # via + # ipython # pydantic # pydantic-core +tzdata==2025.2 + # via pandas +wcwidth==0.2.13 + # via prompt-toolkit +widgetsnbextension==4.0.13 + # via ipywidgets +zipp==3.21.0 + # via importlib-metadata diff --git a/requirements/ci.txt b/requirements/ci.txt index 75b5ef5e0..87052409c 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -32,7 +32,7 @@ packaging==24.2 # -r ci.in # pyproject-api # tox -platformdirs==4.3.6 +platformdirs==4.3.7 # via # tox # virtualenv @@ -50,7 +50,7 @@ tomli==2.2.1 # tox tox==4.24.2 # via -r ci.in -typing-extensions==4.12.2 +typing-extensions==4.13.0 # via tox urllib3==2.3.0 # via requests diff --git a/requirements/dev.txt b/requirements/dev.txt index 709df96fa..dd77f2b5d 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -26,13 +26,9 @@ async-lru==2.0.5 # via jupyterlab cffi==1.17.1 # via argon2-cffi-bindings -click==8.1.8 - # via - # pip-compile-multi - # pip-tools copier==9.6.0 # via -r dev.in -dunamai==1.23.0 +dunamai==1.23.1 # via copier fqdn==1.5.1 # via jsonschema diff --git a/requirements/docs.txt b/requirements/docs.txt index bf1a48664..2de5fcb9d 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -11,8 +11,6 @@ accessible-pygments==0.0.5 # via pydata-sphinx-theme alabaster==1.0.0 # via sphinx -asttokens==3.0.0 - # via stack-data attrs==25.3.0 # via # jsonschema @@ -27,14 +25,8 @@ beautifulsoup4==4.13.3 # pydata-sphinx-theme bleach[css]==6.2.0 # via nbconvert -comm==0.2.2 - # via - # ipykernel - # ipywidgets debugpy==1.8.13 # via ipykernel -decorator==5.2.1 - # via ipython defusedxml==0.7.1 # via nbconvert docutils==0.21.2 @@ -43,31 +35,14 @@ docutils==0.21.2 # nbsphinx # pydata-sphinx-theme # sphinx -executing==2.2.0 - # via stack-data fastjsonschema==2.21.1 # via nbformat imagesize==1.4.1 # via sphinx -ipydatawidgets==4.3.5 - # via pythreejs ipykernel==6.29.5 # via -r docs.in ipympl==0.9.7 # via -r docs.in -ipython==8.34.0 - # via - # -r docs.in - # ipykernel - # ipympl - # ipywidgets -ipywidgets==8.1.5 - # via - # ipydatawidgets - # ipympl - # pythreejs -jedi==0.19.2 - # via ipython jsonschema==4.23.0 # via nbformat jsonschema-specifications==2024.10.1 @@ -85,21 +60,15 @@ jupyter-core==5.7.2 # nbformat jupyterlab-pygments==0.3.0 # via nbconvert -jupyterlab-widgets==3.0.13 - # via ipywidgets markdown-it-py==3.0.0 # via # mdit-py-plugins # myst-parser -matplotlib-inline==0.1.7 - # via - # ipykernel - # ipython mdit-py-plugins==0.4.2 # via myst-parser mdurl==0.1.2 # via markdown-it-py -mistune==3.1.2 +mistune==3.1.3 # via nbconvert myst-parser==4.0.1 # via -r docs.in @@ -116,39 +85,16 @@ nbsphinx==0.9.7 # via -r docs.in nest-asyncio==1.6.0 # via ipykernel -pandas==2.2.3 - # via -r docs.in pandocfilters==1.5.1 # via nbconvert -parso==0.8.4 - # via jedi -pexpect==4.9.0 - # via ipython -prompt-toolkit==3.0.50 - # via ipython psutil==7.0.0 # via # -r docs.in # ipykernel -ptyprocess==0.7.0 - # via pexpect -pure-eval==0.2.3 - # via stack-data pyarrow==19.0.1 # via -r docs.in pydata-sphinx-theme==0.16.1 # via -r docs.in -pygments==2.19.1 - # via - # accessible-pygments - # ipython - # nbconvert - # pydata-sphinx-theme - # sphinx -pythreejs==2.4.2 - # via -r docs.in -pytz==2025.1 - # via pandas pyzmq==26.3.0 # via # ipykernel @@ -192,39 +138,13 @@ sphinxcontrib-qthelp==2.0.0 # via sphinx sphinxcontrib-serializinghtml==2.0.0 # via sphinx -stack-data==0.6.3 - # via ipython tinycss2==1.4.0 # via bleach tornado==6.4.2 # via # ipykernel # jupyter-client -traitlets==5.14.3 - # via - # comm - # ipykernel - # ipympl - # ipython - # ipywidgets - # jupyter-client - # jupyter-core - # matplotlib-inline - # nbclient - # nbconvert - # nbformat - # nbsphinx - # pythreejs - # traittypes -traittypes==0.2.1 - # via ipydatawidgets -tzdata==2025.1 - # via pandas -wcwidth==0.2.13 - # via prompt-toolkit webencodings==0.5.1 # via # bleach # tinycss2 -widgetsnbextension==4.0.13 - # via ipywidgets diff --git a/requirements/nightly.in b/requirements/nightly.in index 66f1513c3..598b5fc34 100644 --- a/requirements/nightly.in +++ b/requirements/nightly.in @@ -7,6 +7,7 @@ ess-streaming-data-types jinja2 pydantic>=2 pooch +esssans essspectroscopy pytest pytest-benchmark diff --git a/requirements/nightly.txt b/requirements/nightly.txt index 852cac488..0b652ace2 100644 --- a/requirements/nightly.txt +++ b/requirements/nightly.txt @@ -1,4 +1,4 @@ -# SHA1:cd9178b35731b463833d75aae36658f14d320af0 +# SHA1:d8119ed2adf68d0fe7bb0a0f93fd8e7ab05ebecb # # This file is autogenerated by pip-compile-multi # To update, run: @@ -10,12 +10,20 @@ annotated-types==0.7.0 # via pydantic +asttokens==3.0.0 + # via stack-data certifi==2025.1.31 # via requests charset-normalizer==3.4.1 # via requests choppera==0.1.6 # via essspectroscopy +click==8.1.8 + # via dask +cloudpickle==3.1.1 + # via dask +comm==0.2.2 + # via ipywidgets confluent-kafka==2.8.2 # via -r nightly.in contourpy==1.3.1 @@ -24,6 +32,10 @@ cyclebane==24.10.0 # via sciline cycler==0.12.1 # via matplotlib +dask==2025.3.0 + # via esssans +decorator==5.2.1 + # via ipython dnspython==2.7.0 # via email-validator email-validator==2.2.0 @@ -33,15 +45,26 @@ ess-streaming-data-types==0.27.0 essreduce @ git+https://github.com/scipp/essreduce@main # via # -r nightly.in + # esssans # essspectroscopy +esssans==25.2.0 + # via -r nightly.in essspectroscopy==0.25.2.0 # via -r nightly.in exceptiongroup==1.2.2 - # via pytest + # via + # ipython + # pytest +executing==2.2.0 + # via stack-data flatbuffers==25.2.10 # via ess-streaming-data-types fonttools==4.56.0 # via matplotlib +fsspec==2025.3.0 + # via dask +graphviz==0.20.3 + # via esssans h5py==3.13.0 # via # scippneutron @@ -50,16 +73,32 @@ idna==3.10 # via # email-validator # requests -iniconfig==2.0.0 +importlib-metadata==8.6.1 + # via dask +iniconfig==2.1.0 # via pytest +ipydatawidgets==4.3.5 + # via pythreejs +ipython==8.34.0 + # via ipywidgets +ipywidgets==8.1.5 + # via + # ipydatawidgets + # pythreejs +jedi==0.19.2 + # via ipython jinja2==3.1.6 # via -r nightly.in +jupyterlab-widgets==3.0.13 + # via ipywidgets kiwisolver==1.4.8 # via matplotlib lazy-loader==0.4 # via # plopp # scippneutron +locket==1.0.0 + # via partd loguru==0.7.3 # via essspectroscopy markupsafe==3.0.2 @@ -68,6 +107,8 @@ matplotlib==3.10.1 # via # mpltoolbox # plopp +matplotlib-inline==0.1.7 + # via ipython mpltoolbox==24.5.1 # via scippneutron networkx==3.4.2 @@ -77,26 +118,40 @@ numpy==2.2.4 # choppera # contourpy # ess-streaming-data-types + # esssans # h5py + # ipydatawidgets # matplotlib # mpltoolbox + # pandas # polystar + # pythreejs # scipp # scippneutron # scipy packaging==24.2 # via + # dask # lazy-loader # matplotlib # pooch # pytest +pandas==2.2.3 + # via esssans +parso==0.8.4 + # via jedi +partd==1.4.2 + # via dask +pexpect==4.9.0 + # via ipython pillow==11.1.0 # via matplotlib -platformdirs==4.3.6 +platformdirs==4.3.7 # via pooch plopp @ git+https://github.com/scipp/plopp@main # via # -r nightly.in + # esssans # scippneutron pluggy==1.5.0 # via pytest @@ -104,15 +159,23 @@ polystar==0.4.5 # via choppera pooch==1.8.2 # via -r nightly.in +prompt-toolkit==3.0.50 + # via ipython +ptyprocess==0.7.0 + # via pexpect +pure-eval==0.2.3 + # via stack-data py-cpuinfo==9.0.0 # via pytest-benchmark -pydantic==2.11.0b1 +pydantic==2.11.0b2 # via # -r nightly.in # scippneutron -pydantic-core==2.31.1 +pydantic-core==2.32.0 # via pydantic -pyparsing==3.2.1 +pygments==2.19.1 + # via ipython +pyparsing==3.2.3 # via matplotlib pytest==8.3.5 # via @@ -123,34 +186,45 @@ pytest-benchmark==5.1.0 python-dateutil==2.9.0.post0 # via # matplotlib + # pandas # scippneutron # scippnexus # strictyaml +pythreejs==2.4.2 + # via esssans +pytz==2025.2 + # via pandas pyyaml==6.0.2 - # via -r nightly.in + # via + # -r nightly.in + # dask requests==2.32.3 # via pooch sciline @ git+https://github.com/scipp/sciline@main # via # -r nightly.in # essreduce + # esssans # essspectroscopy scipp==100.0.0.dev0 # via # -r nightly.in # choppera # essreduce + # esssans # essspectroscopy # scippneutron # scippnexus scippneutron==25.2.1 # via # essreduce + # esssans # essspectroscopy scippnexus==24.11.1 # via # choppera # essreduce + # esssans # essspectroscopy # scippneutron scipy==1.15.2 @@ -160,18 +234,43 @@ scipy==1.15.2 # scippnexus six==1.17.0 # via python-dateutil +stack-data==0.6.3 + # via ipython strictyaml==1.7.3 # via choppera tomli==2.2.1 # via pytest +toolz==1.0.0 + # via + # dask + # partd tqdm==4.67.1 # via essspectroscopy -typing-extensions==4.12.2 +traitlets==5.14.3 + # via + # comm + # ipython + # ipywidgets + # matplotlib-inline + # pythreejs + # traittypes +traittypes==0.2.1 + # via ipydatawidgets +typing-extensions==4.13.0 # via + # ipython # pydantic # pydantic-core # typing-inspection typing-inspection==0.4.0 # via pydantic +tzdata==2025.2 + # via pandas urllib3==2.3.0 # via requests +wcwidth==0.2.13 + # via prompt-toolkit +widgetsnbextension==4.0.13 + # via ipywidgets +zipp==3.21.0 + # via importlib-metadata diff --git a/requirements/static.txt b/requirements/static.txt index 9d1ea209a..bf84eda07 100644 --- a/requirements/static.txt +++ b/requirements/static.txt @@ -15,9 +15,9 @@ identify==2.6.9 # via pre-commit nodeenv==1.9.1 # via pre-commit -platformdirs==4.3.6 +platformdirs==4.3.7 # via virtualenv -pre-commit==4.1.0 +pre-commit==4.2.0 # via -r static.in pyyaml==6.0.2 # via pre-commit diff --git a/scripts/setup-kafka-topics.sh b/scripts/setup-kafka-topics.sh index f25feea0e..df664a975 100644 --- a/scripts/setup-kafka-topics.sh +++ b/scripts/setup-kafka-topics.sh @@ -31,6 +31,16 @@ kafka-topics --create --bootstrap-server kafka:29092 \ --config segment.bytes=104857600 \ --config segment.ms=60000 +kafka-topics --create --bootstrap-server kafka:29092 \ + --topic ${BEAMLIME_INSTRUMENT}_motion \ + --config cleanup.policy=delete \ + --config delete.retention.ms=60000 \ + --config max.message.bytes=104857600 \ + --config retention.bytes=10737418240 \ + --config retention.ms=30000 \ + --config segment.bytes=104857600 \ + --config segment.ms=60000 + kafka-topics --create --bootstrap-server kafka:29092 \ --topic ${BEAMLIME_INSTRUMENT}_beamlime_commands \ --config cleanup.policy=compact \ diff --git a/src/beamlime/config/models.py b/src/beamlime/config/models.py index dfd9db9de..df4a9e9b8 100644 --- a/src/beamlime/config/models.py +++ b/src/beamlime/config/models.py @@ -112,3 +112,12 @@ def validate_range(self) -> ROIAxisRange: class ROIRectangle(BaseModel): x: ROIAxisRange = Field(default_factory=ROIAxisRange) y: ROIAxisRange = Field(default_factory=ROIAxisRange) + + +class WorkflowControl(BaseModel): + source_name: str = Field( + description="Name of the source to control.", + ) + workflow_name: str | None = Field( + description="Name of the workflow to control.", + ) diff --git a/src/beamlime/config/raw_detectors/bifrost.py b/src/beamlime/config/raw_detectors/bifrost.py index 6b3a923f0..3da270869 100644 --- a/src/beamlime/config/raw_detectors/bifrost.py +++ b/src/beamlime/config/raw_detectors/bifrost.py @@ -18,6 +18,7 @@ from scippnexus import NXdetector from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename +from beamlime.handlers.workflow_manager import processor_factory def _to_flat_detector_view(da: sc.DataArray) -> sc.DataArray: @@ -85,7 +86,8 @@ def _make_counts_per_angle( sc.zeros(dims=['angle'], shape=[45], unit='counts'), coords={'angle': edges} ) counts = sc.values(data.sum().data) - da['angle', rotation.data[-1]] += counts + if rotation is not None: + da['angle', rotation.data[-1]] += counts return da @@ -101,20 +103,39 @@ def _make_counts_per_angle( _reduction_workflow.insert(_make_counts_per_angle) -def _make_processor(): +@processor_factory.register(name='spectrum-view') +def _spectrum_view() -> StreamProcessor: return StreamProcessor( - _reduction_workflow, + _reduction_workflow.copy(), + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + target_keys=(SpectrumView,), + accumulators=(SpectrumView,), + ) + + +@processor_factory.register(name='counts-per-angle') +def _counts_per_angle() -> StreamProcessor: + return StreamProcessor( + _reduction_workflow.copy(), dynamic_keys=(NeXusData[NXdetector, SampleRun],), - accumulators=(SpectrumView, CountsPerAngle), context_keys=(DetectorRotation,), - target_keys=(SpectrumView, CountsPerAngle), + target_keys=(CountsPerAngle,), + accumulators=(CountsPerAngle,), ) -def make_stream_processors(): - return {'unified_detector': _make_processor()} +@processor_factory.register(name='all') +def _all() -> StreamProcessor: + return StreamProcessor( + _reduction_workflow.copy(), + dynamic_keys=(NeXusData[NXdetector, SampleRun],), + context_keys=(DetectorRotation,), + target_keys=(CountsPerAngle, SpectrumView), + accumulators=(CountsPerAngle, SpectrumView), + ) +source_names = ('unified_detector',) source_to_key = { 'unified_detector': NeXusData[NXdetector, SampleRun], 'detector_rotation': DetectorRotation, diff --git a/src/beamlime/config/raw_detectors/loki.py b/src/beamlime/config/raw_detectors/loki.py index 23bd63aaf..c2ad9643b 100644 --- a/src/beamlime/config/raw_detectors/loki.py +++ b/src/beamlime/config/raw_detectors/loki.py @@ -1,7 +1,23 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +from ess.loki.live import _configured_Larmor_AgBeh_workflow +from ess.reduce.nexus.types import NeXusData, NeXusDetectorName, SampleRun +from ess.reduce.streaming import StreamProcessor +from ess.sans.types import ( + Denominator, + Filename, + Incident, + IofQ, + Numerator, + ReducedQ, + Transmission, +) +from scippnexus import NXdetector -_res_scale = 8 +from beamlime.handlers.detector_data_handler import get_nexus_geometry_filename +from beamlime.handlers.workflow_manager import processor_factory + +_res_scale = 12 detectors_config = { 'detectors': { @@ -63,3 +79,49 @@ }, }, } + +_workflow = _configured_Larmor_AgBeh_workflow() + + +@processor_factory.register(name='I(Q)') +def _i_of_q(source_name: str) -> StreamProcessor: + wf = _workflow.copy() + wf[Filename[SampleRun]] = get_nexus_geometry_filename('loki') + wf[NeXusDetectorName] = source_name + return StreamProcessor( + wf, + dynamic_keys=( + NeXusData[NXdetector, SampleRun], + NeXusData[Incident, SampleRun], + NeXusData[Transmission, SampleRun], + ), + target_keys=(IofQ[SampleRun],), + accumulators=(ReducedQ[SampleRun, Numerator], ReducedQ[SampleRun, Denominator]), + ) + + +source_names = ( + 'loki_detector_0', + 'loki_detector_1', + 'loki_detector_2', + 'loki_detector_3', + 'loki_detector_4', + 'loki_detector_5', + 'loki_detector_6', + 'loki_detector_7', + 'loki_detector_8', +) +source_to_key = { + 'loki_detector_0': NeXusData[NXdetector, SampleRun], + 'loki_detector_1': NeXusData[NXdetector, SampleRun], + 'loki_detector_2': NeXusData[NXdetector, SampleRun], + 'loki_detector_3': NeXusData[NXdetector, SampleRun], + 'loki_detector_4': NeXusData[NXdetector, SampleRun], + 'loki_detector_5': NeXusData[NXdetector, SampleRun], + 'loki_detector_6': NeXusData[NXdetector, SampleRun], + 'loki_detector_7': NeXusData[NXdetector, SampleRun], + 'loki_detector_8': NeXusData[NXdetector, SampleRun], + 'monitor1': NeXusData[Incident, SampleRun], + 'monitor2': NeXusData[Transmission, SampleRun], +} +f144_attribute_registry = {} diff --git a/src/beamlime/core/service.py b/src/beamlime/core/service.py index 53bfff687..3fc1d2bb6 100644 --- a/src/beamlime/core/service.py +++ b/src/beamlime/core/service.py @@ -33,6 +33,10 @@ def configure_logging(log_level: int) -> None: format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout, ) + # scipp.transform_coords logs info messages that are not useful and would show + # with every workflow call + scipp_logger = logging.getLogger('scipp') + scipp_logger.setLevel(logging.WARNING) def _setup_logging(self, log_level: int) -> None: """Configure logging for this service instance if not already configured""" diff --git a/src/beamlime/handlers/config_handler.py b/src/beamlime/handlers/config_handler.py index be43ca180..bc66dc926 100644 --- a/src/beamlime/handlers/config_handler.py +++ b/src/beamlime/handlers/config_handler.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) import json import logging +from typing import Any from ..core.handler import Config, Handler from ..core.message import Message, MessageKey @@ -31,10 +32,28 @@ def message_key(instrument: str) -> MessageKey: def __init__(self, *, logger: logging.Logger | None = None, config: Config): super().__init__(logger=logger, config=config) self._store = config + self._actions: dict[str, list[callable]] = {} def get(self, key: str, default=None): return self._store.get(key, default) + def register_action( + self, + key: str, + callback: callable, + ): + """ + Register an action to be called when a specific key is updated. + + Parameters + ---------- + key: + Key to watch for changes + callback: + Callback function to call when the key is updated + """ + self._actions.setdefault(key, []).append(callback) + def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: """ Process configuration messages and update the configuration. @@ -49,6 +68,7 @@ def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: : Empty list as this handler doesn't produce output messages """ + updated: dict[str, Any] = {} for message in messages: try: key = message.value['key'] @@ -57,6 +77,17 @@ def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]: 'Updating config: %s = %s at %s', key, value, message.timestamp ) self._store[key] = value - except Exception as e: # noqa: PERF203 - self._logger.error('Error processing config message: %s', e) + updated[key] = value + except Exception: # noqa: PERF203 + self._logger.exception('Error processing config message:') + # Delay action calls until all messages are processed to reduce triggering + # multiple calls for the same key in case of multiple messages with same key. + for key, value in updated.items(): + for action in self._actions.get(key, []): + try: + action(value) + except KeyError: # noqa: PERF203 + self._logger.exception( + 'Error processing config action for %s:', key + ) return [] diff --git a/src/beamlime/handlers/data_reduction_handler.py b/src/beamlime/handlers/data_reduction_handler.py index 6157cc064..aba59ec27 100644 --- a/src/beamlime/handlers/data_reduction_handler.py +++ b/src/beamlime/handlers/data_reduction_handler.py @@ -3,23 +3,16 @@ from __future__ import annotations import logging -from types import ModuleType from typing import Any import scipp as sc -from ess.reduce.streaming import StreamProcessor -from sciline.typing import Key -from ..core.handler import ( - Accumulator, - Config, - Handler, - HandlerFactory, - PeriodicAccumulatingHandler, -) +from ..core.handler import Config, Handler, HandlerFactory, PeriodicAccumulatingHandler from ..core.message import Message, MessageKey from .accumulators import DetectorEvents, ToNXevent_data +from .monitor_data_handler import MonitorDataPreprocessor from .to_nx_log import ToNXlog +from .workflow_manager import WorkflowManager class NullHandler(Handler[Any, None]): @@ -37,118 +30,49 @@ class ReductionHandlerFactory( def __init__( self, *, - instrument_config: ModuleType, + workflow_manager: WorkflowManager, + f144_attribute_registry: dict[str, dict[str, Any]], logger: logging.Logger | None = None, config: Config, ) -> None: self._logger = logger or logging.getLogger(__name__) self._config = config - self._instrument_config = instrument_config - self._processors = self._instrument_config.make_stream_processors() - self._source_to_key = self._instrument_config.source_to_key - self._attrs_registry = self._instrument_config.f144_attribute_registry + self._workflow_manager = workflow_manager + self._f144_attribute_registry = f144_attribute_registry def _is_nxlog(self, key: MessageKey) -> bool: return key.topic.split('_', maxsplit=1)[1] in ('motion',) + def _is_monitor(self, key: MessageKey) -> bool: + return key.topic.split('_', maxsplit=1)[1] in ('beam_monitor',) + def make_handler( self, key: MessageKey ) -> Handler[DetectorEvents, sc.DataGroup[sc.DataArray]]: self._logger.info("Creating handler for %s", key) - wf_key = self._source_to_key.get(key.source_name) - if wf_key is None: + accumulator = self._workflow_manager.get_accumulator(key.source_name) + if accumulator is None: self._logger.warning( "No workflow key found for source name %s, using null handler", key.source_name, ) return NullHandler(logger=self._logger, config=self._config) - if is_context := self._is_nxlog(key): - preprocessor = ToNXlog(attrs=self._attrs_registry[key.source_name]) + if self._is_nxlog(key): + attrs = self._f144_attribute_registry[key.source_name] + preprocessor = ToNXlog(attrs=attrs) + elif self._is_monitor(key): + preprocessor = MonitorDataPreprocessor(config=self._config) else: preprocessor = ToNXevent_data() self._logger.info( - "Preprocessor %s is used for source name %s", - preprocessor.__class__.__name__, - key.source_name, + "%s using preprocessor %s", key.source_name, preprocessor.__class__.__name__ ) + self._logger.info("%s using accumulator %s", key.source_name, accumulator) - if (processor := self._processors.get(key.source_name)) is not None: - accumulator = StreamProcessorProxy(processor, key=wf_key) - self._logger.info( - "Source name %s is mapped to input %s of stream processor %s", - key.source_name, - wf_key, - key.source_name, - ) - else: - # Note the inefficiency here, of processing these sources in multiple - # workflows. This is typically once per detector. If monitors are large this - # can turn into a problem. At the same time, we want to keep flexible to - # allow for - # - # 1. Different workflows for different detector banks, e.g., for diffraction - # and SANS detectors. - # 2. Simple scaling, by processing different detectors on different nodes. - # - # Both could probably also be achieved with a non-duplicate processing of - # monitors, but we keep it simple until proven to be necessary. Note that - # an alternative would be to move some cost into the preprocessor, which - # could, e.g., histogram large monitors to reduce the duplicate cost in the - # stream processors. - accumulator = MultiplexingProxy( - list(self._processors.values()), key=wf_key, is_context=is_context - ) - self._logger.info( - "Source name %s is mapped to input %s in all stream processors", - key.source_name, - wf_key, - ) return PeriodicAccumulatingHandler( logger=self._logger, config=self._config, preprocessor=preprocessor, accumulators={f'reduced/{key.source_name}': accumulator}, ) - - -class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__( - self, stream_processors: list[StreamProcessor], key: Key, is_context: bool - ) -> None: - self._stream_processors = stream_processors - self._key = key - self._is_context = is_context - - def add(self, timestamp: int, data: sc.DataArray) -> None: - if self._is_context: - for stream_processor in self._stream_processors: - stream_processor.set_context({self._key: data}) - else: - for stream_processor in self._stream_processors: - stream_processor.accumulate({self._key: data}) - - def get(self) -> sc.DataGroup[sc.DataArray]: - return sc.DataGroup() - - def clear(self) -> None: - # Clearing would be ok, but should be redundant since the stream processors are - # cleared for each detector in the non-multiplexing proxies. - pass - - -class StreamProcessorProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): - def __init__(self, processor: StreamProcessor, *, key: type) -> None: - self._processor = processor - self._key = key - - def add(self, timestamp: int, data: sc.DataArray) -> None: - self._processor.accumulate({self._key: data}) - - def get(self) -> sc.DataGroup[sc.DataArray]: - return sc.DataGroup( - {str(key): val for key, val in self._processor.finalize().items()} - ) - - def clear(self) -> None: - self._processor.clear() diff --git a/src/beamlime/handlers/detector_data_handler.py b/src/beamlime/handlers/detector_data_handler.py index b034217a6..841e751d0 100644 --- a/src/beamlime/handlers/detector_data_handler.py +++ b/src/beamlime/handlers/detector_data_handler.py @@ -211,6 +211,7 @@ def clear(self) -> None: _registry = { 'geometry-dream-2025-01-01.nxs': 'md5:91aceb884943c76c0c21400ee74ad9b6', 'geometry-loki-2025-01-01.nxs': 'md5:8d0e103276934a20ba26bb525e53924a', + 'geometry-loki-2025-03-26.nxs': 'md5:279dc8cf7dae1fac030d724bc45a2572', 'geometry-bifrost-2025-01-01.nxs': 'md5:ae3caa99dd56de9495b9321eea4e4fef', } diff --git a/src/beamlime/handlers/workflow_manager.py b/src/beamlime/handlers/workflow_manager.py new file mode 100644 index 000000000..064a8a100 --- /dev/null +++ b/src/beamlime/handlers/workflow_manager.py @@ -0,0 +1,231 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +from __future__ import annotations + +import inspect +from collections.abc import Callable, Iterator, MutableMapping, Sequence +from functools import wraps +from typing import Any + +import scipp as sc +from ess.reduce.streaming import StreamProcessor +from sciline.typing import Key + +from ..config.models import WorkflowControl +from ..core.handler import Accumulator + + +class StreamProcessorFactory: + def __init__(self) -> None: + self._factories: dict[str, Callable[[], StreamProcessor]] = {} + + def get_available(self) -> tuple[str, ...]: + """Return a tuple of available factory names.""" + return tuple(self._factories.keys()) + + def register( + self, name: str + ) -> Callable[[Callable[[], StreamProcessor]], Callable[[], StreamProcessor]]: + """ + Decorator to register a factory function for creating StreamProcessors. + + Parameters + ---------- + name: + Name to register the factory under. + + Returns + ------- + Decorator function that registers the factory and returns it unchanged. + """ + + def decorator( + factory: Callable[[], StreamProcessor], + ) -> Callable[[], StreamProcessor]: + @wraps(factory) + def wrapper() -> StreamProcessor: + return factory() + + if name in self._factories: + raise ValueError(f"Factory for {name} already registered") + self._factories[name] = factory + return wrapper + + return decorator + + def create(self, *, workflow_name: str, source_name: str) -> StreamProcessor: + """Create a StreamProcessor using the registered factory.""" + factory = self._factories[workflow_name] + sig = inspect.signature(factory) + if 'source_name' in sig.parameters: + return factory(source_name=source_name) + else: + return factory() + + +processor_factory = StreamProcessorFactory() + + +class ProcessorRegistry(MutableMapping[str, StreamProcessor]): + def __init__(self) -> None: + self._processors: dict[str, StreamProcessor] = {} + + def __getitem__(self, key: str) -> StreamProcessor: + if key not in self._processors: + raise KeyError(f"Processor {key} not found") + return self._processors[key] + + def __setitem__(self, key: str, value: StreamProcessor) -> None: + self._processors[key] = value + + def __delitem__(self, key: str) -> None: + if key not in self._processors: + raise KeyError(f"Processor {key} not found") + del self._processors[key] + + def __iter__(self) -> Iterator[str]: + return iter(self._processors) + + def __len__(self) -> int: + return len(self._processors) + + +class WorkflowManager: + def __init__( + self, + *, + source_names: Sequence[str], + source_to_key: dict[str, Key], + ) -> None: + """ + Parameters + ---------- + source_names: + List of source names to attach workflows to. These need to be passed + explicitly, so we can distinguish source names that should not be handled + (to proxy and handler will be created) from source names that may later be + configured to use a workflow. + source_to_key: + Dictionary mapping source names to workflow input keys. + dynamic_workflows: + Dictionary mapping source names to dynamic workflows. + """ + self._source_names = source_names + self._source_to_key = source_to_key + self._processors = ProcessorRegistry() + self._proxies: dict[str, StreamProcessorProxy] = {} + for name in source_names: + self.set_worklow(name, None) + + def set_worklow(self, source_name: str, processor: StreamProcessor | None) -> None: + """ + Add a workflow to the manager. + + Parameters + ---------- + source_name: + Source name to attach the workflow to. + workflow: + The workflow to attach to the source name. If None, the workflow is removed. + """ + if source_name not in self._source_names: + raise ValueError(f"Workflow {source_name} was not defined in the manager.") + if processor is None: + self._processors.pop(source_name, None) + else: + self._processors[source_name] = processor + if (proxy := self._proxies.get(source_name)) is not None: + proxy.set_processor(self._processors.get(source_name)) + + def set_workflow_from_command(self, command: Any) -> None: + decoded = WorkflowControl.model_validate(command) + if decoded.workflow_name is None: + processor = None + else: + processor = processor_factory.create( + workflow_name=decoded.workflow_name, source_name=decoded.source_name + ) + self.set_worklow(decoded.source_name, processor) + + def get_accumulator( + self, source_name: str + ) -> MultiplexingProxy | StreamProcessorProxy | None: + wf_key = self._source_to_key.get(source_name) + if wf_key is None: + return None + if source_name in self._source_names: + # Note that the processor may be 'None' at this point. + proxy = StreamProcessorProxy(self._processors.get(source_name), key=wf_key) + self._proxies[source_name] = proxy + return proxy + else: + # Note the inefficiency here, of processing these sources in multiple + # workflows. This is typically once per detector. If monitors are large this + # can turn into a problem. At the same time, we want to keep flexible to + # allow for + # + # 1. Different workflows for different detector banks, e.g., for diffraction + # and SANS detectors. + # 2. Simple scaling, by processing different detectors on different nodes. + # + # Both could probably also be achieved with a non-duplicate processing of + # monitors, but we keep it simple until proven to be necessary. Note that + # an alternative would be to move some cost into the preprocessor, which + # could, e.g., histogram large monitors to reduce the duplicate cost in the + # stream processors. + return MultiplexingProxy(self._processors, key=wf_key) + + +class MultiplexingProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): + def __init__(self, stream_processors: ProcessorRegistry, key: Key) -> None: + self._stream_processors = stream_processors + self._key = key + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._key})" + + def add(self, timestamp: int, data: sc.DataArray) -> None: + for stream_processor in self._stream_processors.values(): + if self._key in stream_processor._context_keys: + stream_processor.set_context({self._key: data}) + elif self._key in stream_processor._dynamic_keys: + stream_processor.accumulate({self._key: data}) + else: + # Might be unused by this particular workflow + pass + + def get(self) -> sc.DataGroup[sc.DataArray]: + return sc.DataGroup() + + def clear(self) -> None: + # Clearing would be ok, but should be redundant since the stream processors are + # cleared for each detector in the non-multiplexing proxies. + pass + + +class StreamProcessorProxy(Accumulator[sc.DataArray, sc.DataGroup[sc.DataArray]]): + def __init__(self, processor: StreamProcessor | None = None, *, key: type) -> None: + self._processor = processor + self._key = key + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._key})" + + def set_processor(self, processor: StreamProcessor | None) -> None: + """Set the processor to use for this proxy.""" + self._processor = processor + + def add(self, timestamp: int, data: sc.DataArray) -> None: + if self._processor is not None: + self._processor.accumulate({self._key: data}) + + def get(self) -> sc.DataGroup[sc.DataArray]: + if self._processor is None: + return sc.DataGroup() + return sc.DataGroup( + {str(key): val for key, val in self._processor.finalize().items()} + ) + + def clear(self) -> None: + if self._processor is not None: + self._processor.clear() diff --git a/src/beamlime/kafka/message_adapter.py b/src/beamlime/kafka/message_adapter.py index 6aa1e77d5..6ff4ade4c 100644 --- a/src/beamlime/kafka/message_adapter.py +++ b/src/beamlime/kafka/message_adapter.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) from dataclasses import replace from typing import Any, Generic, Protocol, TypeVar diff --git a/src/beamlime/kafka/scipp_da00_compat.py b/src/beamlime/kafka/scipp_da00_compat.py index 454d59df5..eb7434855 100644 --- a/src/beamlime/kafka/scipp_da00_compat.py +++ b/src/beamlime/kafka/scipp_da00_compat.py @@ -14,7 +14,13 @@ def scipp_to_da00( _to_da00_variable(signal_name, sc.values(da.data)), _to_da00_variable('errors', sc.stddevs(da.data)), ] - variables.extend([_to_da00_variable(name, var) for name, var in da.coords.items()]) + variables.extend( + [ + _to_da00_variable(name, var) + for name, var in da.coords.items() + if var.shape == var.values.shape # vector3 etc. not supported currently + ] + ) return variables diff --git a/src/beamlime/services/dashboard.py b/src/beamlime/services/dashboard.py index 7dc1a40da..7136b1dfc 100644 --- a/src/beamlime/services/dashboard.py +++ b/src/beamlime/services/dashboard.py @@ -14,8 +14,10 @@ from beamlime import Service, ServiceBase from beamlime.config import config_names, models from beamlime.config.config_loader import load_config +from beamlime.config.raw_detectors import get_config from beamlime.core.config_service import ConfigService from beamlime.core.message import compact_messages +from beamlime.handlers.workflow_manager import processor_factory from beamlime.kafka import consumer as kafka_consumer from beamlime.kafka.helpers import topic_for_instrument from beamlime.kafka.message_adapter import ( @@ -34,6 +36,7 @@ def __init__( instrument: str = 'dummy', debug: bool = False, log_level: int = logging.INFO, + auto_remove_plots_after_seconds: float = 10.0, ) -> None: name = f'{instrument}_dashboard' super().__init__(name=name, log_level=log_level) @@ -43,10 +46,15 @@ def __init__( # Initialize state self._plots: dict[str, go.Figure] = {} + self._auto_remove_plots_after_seconds = auto_remove_plots_after_seconds + self._last_plot_update: dict[str, float] = {} self._exit_stack = ExitStack() self._exit_stack.__enter__() + # Load instrument configuration for source names + self._instrument_config = get_config(instrument) + # Setup services self._setup_config_service() self._source = self._setup_kafka_consumer() @@ -231,6 +239,46 @@ def _setup_layout(self) -> None: style={'margin': '10px 0'}, ), html.Button('Clear', id='clear-button', n_clicks=0), + html.Hr(style={'margin': '20px 0'}), + html.H3('Workflow Control', style={'marginTop': '10px'}), + html.Label('Source Name'), + dcc.Dropdown( + id='workflow-source-name', + options=[ + {'label': name, 'value': name} + for name in self._instrument_config.source_names + ], + value=self._instrument_config.source_names[0], + style={'width': '100%', 'marginBottom': '10px'}, + ), + html.Div( + [ + dcc.Checklist( + id='workflow-enable', + options=[{'label': 'Enable workflow', 'value': 'enabled'}], + value=['enabled'], + style={'margin': '10px 0'}, + ), + html.Label('Workflow Name'), + dcc.Dropdown( + id='workflow-name', + options=[ + {'label': name, 'value': name} + for name in processor_factory.get_available() + ], + value=processor_factory.get_available()[0], + style={'width': '100%', 'marginBottom': '10px'}, + ), + ], + id='workflow-selector-container', + ), + html.Button( + 'Go!', + id='workflow-control-button', + n_clicks=0, + style={'width': '100%', 'marginTop': '10px'}, + ), + html.Label('Note that workflow changes may take a few seconds to apply.'), ] self._app.layout = html.Div( [ @@ -326,6 +374,21 @@ def _setup_callbacks(self) -> None: Input('use-weights-checkbox', 'value'), )(self.update_use_weights) + # Add callback to enable/disable workflow dropdown + self._app.callback( + Output('workflow-name', 'disabled'), + Input('workflow-enable', 'value'), + )(lambda value: len(value) == 0) + + # Update workflow control button callback + self._app.callback( + Output('workflow-control-button', 'n_clicks'), + Input('workflow-control-button', 'n_clicks'), + Input('workflow-source-name', 'value'), + Input('workflow-name', 'value'), + Input('workflow-enable', 'value'), + )(self.send_workflow_control) + def update_roi(self, x_center, x_delta, y_center, y_delta): x_min = max(0, x_center - x_delta) x_max = min(100, x_center + x_delta) @@ -461,6 +524,7 @@ def update_plots(self, n: int | None): if n is None: raise PreventUpdate + now = time.time() try: messages = self._source.get_messages() num = len(messages) @@ -486,11 +550,19 @@ def update_plots(self, n: int | None): fig.data[0].x = data.coords[x_dim].values fig.data[0].y = data.coords[y_dim].values fig.data[0].z = data.values + self._last_plot_update[key] = now except Exception as e: self._logger.exception("Error in update_plots: %s", e) raise PreventUpdate from None + # Remove plots if no recent update. This happens, e.g., when the reduction + # workflow is removed or changed. + for key, last_update in self._last_plot_update.items(): + if now - last_update > self._auto_remove_plots_after_seconds: + self._plots.pop(key, None) + self._logger.info("Removed plot for %s", key) + graphs = [dcc.Graph(figure=fig) for fig in self._plots.values()] return [html.Div(graphs, style={'display': 'flex', 'flexWrap': 'wrap'})] @@ -510,6 +582,36 @@ def clear_data(self, n_clicks: int | None) -> int: self._config_service.update_config('start_time', model.model_dump()) return 0 + def send_workflow_control( + self, + n_clicks: int | None, + source_name: str | None, + workflow_name: str, + enable_workflow: list[str], + ) -> int: + """Send a workflow control message.""" + if n_clicks is None or n_clicks == 0 or not source_name: + raise PreventUpdate + + actual_workflow_name = workflow_name if enable_workflow else None + + self._logger.info( + "Sending workflow control message: source=%s, workflow=%s", + source_name, + actual_workflow_name, + ) + + workflow_control = models.WorkflowControl( + source_name=source_name, + workflow_name=actual_workflow_name, + ) + + self._config_service.update_config( + f'{source_name}:workflow_control', workflow_control.model_dump() + ) + + return 0 + def _start_impl(self) -> None: self._config_service_thread.start() diff --git a/src/beamlime/services/data_reduction.py b/src/beamlime/services/data_reduction.py index a67b717de..236126550 100644 --- a/src/beamlime/services/data_reduction.py +++ b/src/beamlime/services/data_reduction.py @@ -13,6 +13,7 @@ from beamlime.config.raw_detectors import get_config from beamlime.handlers.config_handler import ConfigHandler from beamlime.handlers.data_reduction_handler import ReductionHandlerFactory +from beamlime.handlers.workflow_manager import WorkflowManager from beamlime.kafka import consumer as kafka_consumer from beamlime.kafka.helpers import ( beam_monitor_topic, @@ -23,12 +24,15 @@ from beamlime.kafka.message_adapter import ( BeamlimeCommandsAdapter, ChainedAdapter, + Da00ToScippAdapter, Ev44ToDetectorEventsAdapter, - Ev44ToMonitorEventsAdapter, F144ToLogDataAdapter, + KafkaToDa00Adapter, KafkaToEv44Adapter, KafkaToF144Adapter, + KafkaToMonitorEventsAdapter, RouteByTopicAdapter, + RoutingAdapter, ) from beamlime.kafka.sink import KafkaSink, UnrollingSinkAdapter from beamlime.kafka.source import MultiConsumer @@ -50,11 +54,17 @@ def setup_arg_parser() -> argparse.ArgumentParser: def make_reduction_service_builder( *, instrument: str, log_level: int = logging.INFO ) -> DataServiceBuilder: - adapter = RouteByTopicAdapter( + monitors = RoutingAdapter( routes={ - beam_monitor_topic(instrument): ChainedAdapter( - first=KafkaToEv44Adapter(), second=Ev44ToMonitorEventsAdapter() + 'ev44': KafkaToMonitorEventsAdapter(), + 'da00': ChainedAdapter( + first=KafkaToDa00Adapter(), second=Da00ToScippAdapter() ), + } + ) + adapter = RouteByTopicAdapter( + routes={ + beam_monitor_topic(instrument): monitors, detector_topic(instrument): ChainedAdapter( first=KafkaToEv44Adapter(), second=Ev44ToDetectorEventsAdapter( @@ -67,10 +77,16 @@ def make_reduction_service_builder( beamlime_command_topic(instrument): BeamlimeCommandsAdapter(), } ) + instrument_config = get_config(instrument) + workflow_manager = WorkflowManager( + source_names=instrument_config.source_names, + source_to_key=instrument_config.source_to_key, + ) config = {} - config_handler = ConfigHandler(config=config) handler_factory = ReductionHandlerFactory( - instrument_config=get_config(instrument), config=config + config=config, + workflow_manager=workflow_manager, + f144_attribute_registry=instrument_config.f144_attribute_registry, ) builder = DataServiceBuilder( instrument=instrument, @@ -79,6 +95,12 @@ def make_reduction_service_builder( adapter=adapter, handler_factory=handler_factory, ) + config_handler = ConfigHandler(config=config) + for source_name in instrument_config.source_names: + config_handler.register_action( + key=f'{source_name}:workflow_control', + callback=workflow_manager.set_workflow_from_command, + ) builder.add_handler(ConfigHandler.message_key(instrument), config_handler) return builder