Skip to content

Conversation

@SimonHeybrock
Copy link
Member

While working on the mechanism to configure data reduction workflows at service runtime, it became clear that the current mechanism for listening to config (aka command) messages was insufficient and cumbersome. Previously, it worked as follows:

  • ConfigSubscriber would consume from a config/command topic. It was running in a separate thread.
  • All data messages where processed in the main thread, using a handler mechanism.
  • The config subscriber was passed to handlers on handler creation, allowing handlers to get the latest config values.

For live data reduction, we needed a mechanism to "take action" on certain config/command messages. The ConfigSubscriber could not do this. It therefore made more sense to redo the mechanism from scratch, handling config/command messages with the handler mechanism, just like data messages. Key changes are as follows:

  • ConfigSubscriber is removed and replaced by ConfigHandler.
  • Config messages are fed into the stream processor, routing adapters take care of the different message decoding.
  • The ConfigHandler is created ahead of time, data message handlers are created by the handler factory as before.

source: MessageSource[Tin],
sink: MessageSink[Tout],
handler_factory: HandlerFactory[Tin, Tout],
handler_registry: HandlerRegistry[Tin, Tout],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have to pass the registry, since we want to be able to insert a custom handler for config messages, i.e., the processor cannot create the registry itself.

Comment on lines +32 to +40
if var.dtype == sc.DType.datetime64:
timedelta = var - sc.epoch(unit=var.unit)
return dataarray_da00.Variable(
name=name,
data=timedelta.values,
axes=list(var.dims),
shape=var.shape,
unit=f'datetime64[{var.unit}]',
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are unrelated. The timeseries helper service previously only worked with --sink=png, now it also works with --sink=kafka.

processor = StreamProcessor(
source=source,
sink=KafkaSink(kafka_config=kafka_config, serializer=serializer),
builder = DataServiceBuilder(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the fake_* services were previously not using DataServiceBuilder. Now they can, by adding a new method to the builder. This is in a sense unrelated to the main change.

@github-project-automation github-project-automation bot moved this to In progress in Development Board Mar 24, 2025
@SimonHeybrock SimonHeybrock moved this from In progress to Selected in Development Board Mar 24, 2025
@YooSunYoung YooSunYoung self-assigned this Mar 25, 2025
Copy link
Member

@YooSunYoung YooSunYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a minor comment and a bit of concerns about consuming messages, with potential blocking.
But we can just revisit this if we ever bump into this issue.

key = MessageKey(topic=message.topic(), source_name='')
legacy_key = message.key().decode('utf-8') # See 286
value = message.value()
return Message(key=key, timestamp=0, value={'key': legacy_key, 'value': value})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka messages have timestamp (timestamp-type and actual-timestamp) so we can probably just use it, unless 0 has a specific meaning in this context.

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, is that why I got a tuple of two ints as a timestamp? I tried using it first but could not make sense of it.

def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
messages = []
for consumer in self._consumers:
messages.extend(consumer.consume(num_messages, timeout))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they shouldn't have same timeout for all different topics, unless it's very small ~0.
If num_message and timeout are not optimized, some topics that have many messages might fall behind.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary from discussion: May want different config for control messages, but unclear what is correct approach right now. Will leave as-is for now and address when we have information on how this works in practice.

@SimonHeybrock SimonHeybrock enabled auto-merge March 26, 2025 10:54
@SimonHeybrock SimonHeybrock merged commit 1a66951 into main Mar 26, 2025
4 checks passed
@SimonHeybrock SimonHeybrock deleted the config-handler branch March 26, 2025 10:57
@github-project-automation github-project-automation bot moved this from Selected to Done in Development Board Mar 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

3 participants