-
Notifications
You must be signed in to change notification settings - Fork 1
Allow runtime "configuration" of reduction handlers #353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| ) | ||
| self._logger.info("%s using accumulator %s", key.source_name, accumulator) | ||
|
|
||
| if (processor := self._processors.get(key.source_name)) is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of here and below moved to workflow_manager.py with little or no change.
1dc7b38 to
73235a9
Compare
| [ | ||
| _to_da00_variable(name, var) | ||
| for name, var in da.coords.items() | ||
| if var.shape == var.values.shape # vector3 etc. not supported currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we somehow log if some coordinates are dropped...? Or do we not care?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't care for now, but we have to agree with ECDC on more details on the da00 results, i.e., what should be included.
| def register_action( | ||
| self, | ||
| key: str, | ||
| callback: callable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback sounds like something to be done after the actual action though.
How about just action...?
And I think callable is a function, not a type. (at least my linter complains)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and I should specify the signature. Ok if I address this in the follow-up #354, where the signature changes?
| updated[key] = value | ||
| except Exception: # noqa: PERF203 | ||
| self._logger.exception('Error processing config message:') | ||
| # Delay action calls until all messages are processed to reduce triggering | ||
| # multiple calls for the same key in case of multiple messages with same key. | ||
| for key, value in updated.items(): | ||
| for action in self._actions.get(key, []): | ||
| try: | ||
| action(value) | ||
| except KeyError: # noqa: PERF203 | ||
| self._logger.exception( | ||
| 'Error processing config action for %s:', key | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so important, but do we want to call actions only when it changes...? We could quickly check if the values are actually changed and then add it to updated dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. Currently it is actually useful, e.g., to recreate workflow (of the same name).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine if it's expected to be handled even if it has same value.
This adds a mechanism for (re)defining the stream processors (wrapping workflows) used to reduce detector data at runtime. In this first iteration this only supports predefined workflows with no parameters, but in principle the mechanism can be extended for this.
Note: Will move to a (hopefully) better command/config mechanism in a follow-up. Details of
ConfigHandlerand the action-registration mechanism will thus change.