Add chopperless wavelength lookup-table workflow#902
Conversation
| @specs.wavelength_lut_handle.attach_factory() | ||
| def _wavelength_lut_workflow_factory(params: WavelengthLutParams): | ||
| """Factory for LOKI's chopperless wavelength lookup-table workflow.""" | ||
| return create_chopperless_wavelength_lut_workflow(params=params) |
There was a problem hiding this comment.
Here it is hard-coded to be chopperless. Is the plan to then update this once we have support for reading chopper info and build a table on the fly?
| return create_chopperless_wavelength_lut_workflow(params=params) | ||
|
|
||
| @specs.monitor_handle.attach_factory() | ||
| def _monitor_workflow_factory(source_name: str, params: MonitorDataParams): |
There was a problem hiding this comment.
Why was it only added to dummy and Loki instruments?
Is it just for testing, and we will roll out to other instruments later?
There was a problem hiding this comment.
Just for testing, yes.
| #: such stream today; its value is a boolean "at setpoint" indicator and | ||
| #: carries no unit (distinct from ``'dimensionless'``, which a number with | ||
| #: cancelled units would have). | ||
| _SYNTHETIC_LOG_ATTRS: dict[str, dict[str, Any]] = { |
There was a problem hiding this comment.
So the idea would be that this single-tick stream would be later replaced by listening to the chopper info streams?
There was a problem hiding this comment.
Yes, basically I want to synthesize an "everything is in phase" signal (or maybe "there is a new setpoint and we wnet out of phase for the previous one", so it gets computed as soon as possible).
| params.distance_range.get_start(), | ||
| params.distance_range.get_stop(), | ||
| ) | ||
| wf[NumberOfSimulatedNeutrons] = int(params.simulation.num_simulated_neutrons) |
There was a problem hiding this comment.
Did I understand correctly that if you want to play with these params, you have to stop the job/workflow, start a new job and change the number of neutrons?
I don't know how often users would play with the params, probably not very often at all, and just use the defaults?
There was a problem hiding this comment.
Yes, parameter changes require restarts. This means files have to be read again, but hopefully just reading the choppers is quick.
I don't know how often users would play with the params, probably not very often at all, and just use the defaults?
Yes, there are defaults. The user can thus just click "Start" without changing them, if they don't want to.
| """Range of total flight paths covered by the lookup table.""" | ||
|
|
||
| start: float = pydantic.Field(default=5.0, description="Shortest L_total.") | ||
| stop: float = pydantic.Field(default=30.0, description="Longest L_total.") |
There was a problem hiding this comment.
For loki, we go up to 35m in esssans. Maybe have the default cover the same range?
This is a general parameter for all instruments, where one single default is not really super adapted...
Can we have a default per instrument?
There was a problem hiding this comment.
We would need to create instrument-specific params, yes. But actually me idea was to either (1) hard-code it per instrument or (2) read min/max distance from the NeXus geometry file (the one which will also be used to read chopper info).
There was a problem hiding this comment.
Hard coding per instrument sounds fine to me.
| source_names=[CHOPPER_CASCADE_SOURCE], | ||
| params=params, | ||
| outputs=WavelengthLutOutputs, | ||
| reset_on_run_transition=False, |
There was a problem hiding this comment.
Can you briefly explain what reset_on_run_transition does?
There was a problem hiding this comment.
It clears the accumulators in the workflow when there is a run-start or run-stop message from NICOS. This workflow currently has no accumulators so this is not actually relevant I think.
New `tof_table` service that computes a wavelength lookup table from chopper setpoints. v0 supports only chopperless instruments: a synthesizer emits a single vacuous "setpoints reached" tick at startup, which triggers the workflow once. Plateau detection, per-chopper f144 plumbing, and NeXus chopper geometry are deferred to v1. Also fixes an architectural gap that surfaced under chopperless operation: `OrchestratingProcessor` previously coupled "data is flowing" with "scheduled jobs can activate" by early-returning on empty batches. Without continuous data the chopperless service's job stayed scheduled forever even though the synthesizer's tick was cached. The empty-batch path now also seeds context for scheduled jobs and runs `process_jobs`, so activation works without ongoing data flow. `tof_table` uses `NaiveMessageBatcher` (matching `timeseries`) so the synthesizer's lone tick is preprocessed immediately rather than buffered indefinitely by `SimpleMessageBatcher`. `DataServiceBuilder` gains an optional `outer_source_wrapper` to inject the synthesizer between the message adapter and the orchestrator. Workflow registered for both `loki` and `dummy`.
The upstream `ess.reduce.unwrap.lut` API uses no "TOF" naming — the output is conceptually a wavelength lookup table indexed by (distance, event_time_offset). Align our service, namespace, workflow name, output key, modules, and classes to match. Renames: - service: tof_table -> wavelength_lut - namespace + workflow name: tof_table/lookup_table -> wavelength_lut/wavelength_lut - output key: lookup_table -> wavelength_lut - modules: lookup_table_* -> wavelength_lut_* - classes: LookupTable* -> WavelengthLut* No behavior change.
Improves UX: operators enter source frequency in Hz (default 14.0) instead of a high-precision period in milliseconds (1000.0/14 ≈ 71.4286). Pulse stride moves to the same model since it's an operational property of the source, not a simulation knob. Hz unit is hardcoded — there's no realistic case for kHz/MHz pulse frequency, so the dropdown is omitted. Provenance coord names on the output (pulse_period, pulse_stride) are unchanged: consumers receive the period the upstream pipeline actually used.
…length_lut_workflow_specs These models are specific to the wavelength LUT workflow and have no other callers. parameter_models.py is for generic, reusable building blocks shared across multiple specs. Also avoids the naming confusion with the identically-named scipp workflow type keys in ess.reduce.unwrap.lut.
The ``wavelength_lut`` workflow is conceptually just an f144-driven workflow — if the producer published ``chopper_cascade_reached`` directly we'd have implemented it as a normal sibling under ``timeseries`` from day one. The ``ChopperSynthesizer`` is scaffolding for that upstream-side gap, not a reason to maintain a separate service. - Register the spec under ``namespace='timeseries'``. - Special-case the synthetic ``chopper_cascade`` source in ``LogdataHandlerFactory`` via a small ``_SYNTHETIC_LOG_ATTRS`` table; do not pollute ``f144_attribute_registry`` (which would auto-register an unwanted timeseries workflow for it). Synthetic attrs use no unit (not ``'dimensionless'``) since the value is a boolean "at setpoint" tick. - Wire ``ChopperSynthesizer`` into the timeseries service via the existing ``outer_source_wrapper`` hook. When the producer publishes the trigger directly, the wrapper drops out and the workflow becomes a plain f144 consumer with no other code change. - Drop the now-redundant ``services/wavelength_lut.py`` and ``handlers/wavelength_lut_handler.py`` (and the latter's unit test). Net -106 lines.
The bespoke Workflow subclass collapsed to a factory: the chopper_cascade trigger is now a sciline dynamic key consumed by an _empty_choppers provider, and provenance coords are attached by another provider. v1's chopper-equipped variant just edits the provider's signature to take chopper-PV context inputs — no livedata-side subclassing. Drops the result cache: it was never hit (orchestrator only re-finalizes on retry, when _result is None anyway) and would block recomputation in v1 when chopper setpoints change.
Earlier rewrite of the orchestrator's workflow_data construction lost the original explanation that context seeding covers both auxiliary streams (detector workflows) and primary streams (timeseries jobs that activate after their data passed through). Merge that back with the chopperless wavelength_lut tick as a third example of why the empty-batch branch needs to run jobs from cached context.
Both branches were doing the same conceptual work — build workflow_data, enrich with cached context, decide if there is anything for jobs to do — just with the steps duplicated and reordered. Collapse them: build workflow_data from the batch when present (else an empty placeholder), then run a single context-enrichment block, then a single idle check. Behaviour preserved: the idle exit still fires only when there is no message_batch AND no cached context to activate any scheduled job, and report_batch still passes 0.0 processing time on idle.
Post-rebase fixups: WorkflowSpec.namespace and Instrument.register_spec's namespace= parameter were removed on main. Switch the spec registration to group=TIMESERIES and the test lookup to spec.group.name.
aef4f67 to
1d1184c
Compare
Summary
MVP for the dynamic TOF lookup-table workflow (issue #894) — chopperless instruments only. Added in `timeseries service as it is conceptually similar. Framework pieces (synthetic primary stream, ChopperSynthesizer wrapper, context-replay-on-job-start, sciline-driven LUT computation, da00 publish with provenance coords) are exercised against a real upstream pipeline. Chopper-equipped support (NeXus geometry loading, plateau detection on phase NXlogs, RawChoppers→DiskChoppers translation) is deliberately out of scope for this PR.
Examples
What's in
wavelength_lutservice: subscribes to commands/run-control, runsess.reduce.unwrap.lut.LookupTableWorkflow()withDiskChoppers={}, publishes a singlewavelength_lutda00 message per fire.ChopperSynthesizer:MessageSourcedecorator; v0 emits one syntheticchopper_cascadeLogData(value=1) tick on the firstget_messagescall. Conceptually the same shape as an f144 boolean "at-setpoint" signal —ToNXlogaccumulates it as context,MessagePreprocessor.get_contextreplays the cached value when the operator schedules a job.Pulse(frequency: float = 14.0, stride: int = 1)replaces a period-in-ms input — no more typing71.4286for 14 Hz.outer_source_wrapperhook onDataServiceBuilderso the synthesizer can sit between the Kafka adapter and the orchestrator.dummy(used in CI tests) andloki(chopperless against the upstreamloki_tof_lookup_table_no_choppersreference).Out of scope (deferred to follow-up)
phase_setpointsynthesisNXdisk_choppercopy inmake_geometry_nexus.pyand per-instrument geometry artifact regenRawChoppers + cached aux setpoints → DiskChopperstranslation providerSourcePositionfrom NeXus (currently a placeholder vector — only used inside the upstream simulator's per-chopper loop, which is empty for chopperless)