Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/beamlime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from .core import (
CommonHandlerFactory,
ConfigSubscriber,
Handler,
Message,
MessageKey,
Expand All @@ -29,7 +28,6 @@

__all__ = [
"CommonHandlerFactory",
"ConfigSubscriber",
"Handler",
"LiveWorkflow",
"Message",
Expand Down
2 changes: 0 additions & 2 deletions src/beamlime/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
from .config_subscriber import ConfigSubscriber
from .handler import CommonHandlerFactory, Handler
from .message import Message, MessageKey, MessageSink, MessageSource, compact_messages
from .processor import Processor, StreamProcessor
from .service import Service, ServiceBase

__all__ = [
'CommonHandlerFactory',
'ConfigSubscriber',
'Handler',
'Message',
'MessageKey',
Expand Down
71 changes: 0 additions & 71 deletions src/beamlime/core/config_subscriber.py

This file was deleted.

3 changes: 3 additions & 0 deletions src/beamlime/core/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def __init__(self, *, factory: HandlerFactory[Tin, Tout]):
def __len__(self) -> int:
return sum(1 for handler in self._handlers.values() if handler is not None)

def register_handler(self, key: MessageKey, handler: Handler[Tin, Tout]) -> None:
self._handlers[key] = handler

def get(self, key: MessageKey) -> Handler[Tin, Tout] | None:
if key not in self._handlers:
self._handlers[key] = self._factory.make_handler(key)
Expand Down
8 changes: 4 additions & 4 deletions src/beamlime/core/processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from __future__ import annotations

import logging
from collections import defaultdict
from typing import Generic, Protocol

from .handler import HandlerFactory, HandlerRegistry
from .handler import HandlerRegistry
from .message import MessageSink, MessageSource, Tin, Tout


Expand All @@ -32,12 +32,12 @@ def __init__(
logger: logging.Logger | None = None,
source: MessageSource[Tin],
sink: MessageSink[Tout],
handler_factory: HandlerFactory[Tin, Tout],
handler_registry: HandlerRegistry[Tin, Tout],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have to pass the registry, since we want to be able to insert a custom handler for config messages, i.e., the processor cannot create the registry itself.

) -> None:
self._logger = logger or logging.getLogger(__name__)
self._source = source
self._sink = sink
self._handler_registry = HandlerRegistry(factory=handler_factory)
self._handler_registry = handler_registry

def process(self) -> None:
messages = self._source.get_messages()
Expand Down
6 changes: 0 additions & 6 deletions src/beamlime/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,18 @@ class Service(ServiceBase):
def __init__(
self,
*,
children: list[StartStoppable] | None = None,
processor: Processor,
name: str | None = None,
log_level: int = logging.INFO,
poll_interval: float = 0.01,
):
super().__init__(name=name, log_level=log_level)
self._poll_interval = poll_interval
self._children = children or []
self._processor = processor
self._thread: threading.Thread | None = None

def _start_impl(self) -> None:
"""Start the service and block until stopped"""
for child in self._children:
child.start()
self._thread = threading.Thread(target=self._run_loop)
self._thread.start()

Expand Down Expand Up @@ -147,8 +143,6 @@ def _stop_impl(self) -> None:
"""Stop the service gracefully"""
if self._thread:
self._thread.join()
for child in self._children:
child.stop()

@staticmethod
def setup_arg_parser(description: str) -> argparse.ArgumentParser:
Expand Down
62 changes: 62 additions & 0 deletions src/beamlime/handlers/config_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
import json
import logging

from ..core.handler import Config, Handler
from ..core.message import Message, MessageKey
from ..kafka.helpers import beamlime_command_topic


class ConfigHandler(Handler[bytes, None]):
"""
Handler for configuration messages.

This handler processes configuration messages and updates the configuration
dictionary accordingly. It is used by StreamProcessor to handle configuration
updates received from a Kafka topic.

Parameters
----------
logger
Logger to use
config
Configuration object to update
"""

@staticmethod
def message_key(instrument: str) -> MessageKey:
return MessageKey(topic=beamlime_command_topic(instrument), source_name='')

def __init__(self, *, logger: logging.Logger | None = None, config: Config):
super().__init__(logger=logger, config=config)
self._store = config

def get(self, key: str, default=None):
return self._store.get(key, default)

def handle(self, messages: list[Message[bytes]]) -> list[Message[None]]:
"""
Process configuration messages and update the configuration.

Parameters
----------
messages:
List of messages containing configuration updates

Returns
-------
:
Empty list as this handler doesn't produce output messages
"""
for message in messages:
try:
key = message.value['key']
value = json.loads(message.value['value'].decode('utf-8'))
self._logger.info(
'Updating config: %s = %s at %s', key, value, message.timestamp
)
self._store[key] = value
except Exception as e: # noqa: PERF203
self._logger.error('Error processing config message: %s', e)
return []
7 changes: 7 additions & 0 deletions src/beamlime/kafka/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def motion_topic(instrument: str) -> str:
return topic_for_instrument(topic='motion', instrument=instrument)


def beamlime_command_topic(instrument: str) -> str:
"""
Return the topic name for the beamlime command data of an instrument.
"""
return topic_for_instrument(topic='beamlime_commands', instrument=instrument)


def source_name(device: str, signal: str) -> str:
"""
Return the source name for a given device and signal.
Expand Down
24 changes: 24 additions & 0 deletions src/beamlime/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def adapt(self, message: T) -> U:
pass


class IdentityAdapter(MessageAdapter[T, T]):
"""
An adapter that does not change the message.
"""

def adapt(self, message: T) -> T:
return message


class KafkaToEv44Adapter(
MessageAdapter[KafkaMessage, Message[eventdata_ev44.EventData]]
):
Expand Down Expand Up @@ -185,6 +194,21 @@ def adapt(
return replace(message, value=da00_to_scipp(message.value))


class BeamlimeCommandsAdapter(MessageAdapter[KafkaMessage, Message[Any]]):
"""
Adapts a Kafka message to a Beamlime command message.
"""

def adapt(self, message: KafkaMessage) -> Message[Any]:
key = MessageKey(topic=message.topic(), source_name='')
timestamp = message.timestamp()[1]
legacy_key = message.key().decode('utf-8') # See 286
value = message.value()
return Message(
key=key, timestamp=timestamp, value={'key': legacy_key, 'value': value}
)


class ChainedAdapter(MessageAdapter[T, V]):
"""
Chains two adapters together.
Expand Down
12 changes: 12 additions & 0 deletions src/beamlime/kafka/scipp_da00_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def da00_to_scipp(


def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable:
if var.dtype == sc.DType.datetime64:
timedelta = var - sc.epoch(unit=var.unit)
return dataarray_da00.Variable(
name=name,
data=timedelta.values,
axes=list(var.dims),
shape=var.shape,
unit=f'datetime64[{var.unit}]',
)
Comment on lines +32 to +40
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are unrelated. The timeseries helper service previously only worked with --sink=png, now it also works with --sink=kafka.

return dataarray_da00.Variable(
name=name,
data=var.values,
Expand All @@ -39,4 +48,7 @@ def _to_da00_variable(name: str, var: sc.Variable) -> dataarray_da00.Variable:


def _to_scipp_variable(var: dataarray_da00.Variable) -> sc.Variable:
if var.unit.startswith('datetime64'):
unit = var.unit.split('[')[1].rstrip(']')
return sc.epoch(unit=unit) + sc.array(dims=var.axes, values=var.data, unit=unit)
return sc.array(dims=var.axes, values=var.data, unit=var.unit)
23 changes: 23 additions & 0 deletions src/beamlime/kafka/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,29 @@ def close(self) -> None:
pass


class MultiConsumer:
"""
Message source for multiple Kafka consumers.

This class allows for consuming messages from multiple Kafka consumers with
different configuration. In particular, we need to use different topic offsets for
data topics vs. config/command topics.
"""

def __init__(self, consumers):
self._consumers = consumers

def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
messages = []
for consumer in self._consumers:
messages.extend(consumer.consume(num_messages, timeout))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they shouldn't have same timeout for all different topics, unless it's very small ~0.
If num_message and timeout are not optimized, some topics that have many messages might fall behind.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary from discussion: May want different config for control messages, but unclear what is correct approach right now. Will leave as-is for now and address when we have information on how this works in practice.

return messages

def close(self) -> None:
for consumer in self._consumers:
consumer.close()


class KafkaMessageSource(MessageSource[KafkaMessage]):
"""
Message source for messages from Kafka.
Expand Down
Loading