# Streamed processing with Loki workflow

In [None]:
import scipp as sc
from ess import sans
from ess import loki
import ess.loki.data  # noqa: F401
from ess.sans.types import *

## Create and configure the workflow

We begin by creating the Loki workflow object (this is a [sciline.Pipeline](https://scipp.github.io/sciline/generated/classes/sciline.Pipeline.html) which can be consulted for advanced usage).
The files we use here come from a Loki detector test at Larmor, so we use the corresponding workflow:

In [None]:
workflow = loki.LokiAtLarmorWorkflow()

We configure the workflow be defining the series of masks filenames and bank names to reduce.
In this case there is just a single bank:

In [None]:
workflow = sans.with_pixel_mask_filenames(
    workflow, masks=loki.data.loki_tutorial_mask_filenames()
)
workflow[NeXusDetectorName] = 'larmor_detector'

The workflow can be visualized as a graph.
For readability we show only sub-workflow for computing `IofQ[Sample]`.
The workflow can actually compute the full `BackgroundSubtractedIofQ`, which applies and equivalent workflow to the background run, before a subtraction step:

In [None]:
workflow.visualize(IofQ[SampleRun], compact=True, graph_attr={'rankdir': 'LR'})
wf = workflow.copy()
wf[TransmissionFraction[SampleRun]] = None
wf.visualize(IofQ[SampleRun], compact=True, graph_attr={'rankdir': 'LR'})

Note the red boxes which indicate missing input parameters.
We can set these missing parameters, as well as parameters where we do not want to use the defaults:

In [None]:
# Wavelength binning parameters
wavelength_min = sc.scalar(1.0, unit='angstrom')
wavelength_max = sc.scalar(13.0, unit='angstrom')
n_wavelength_bins = 50
n_wavelength_bands = 50

workflow[WavelengthBins] = sc.linspace(
    'wavelength', wavelength_min, wavelength_max, n_wavelength_bins + 1
)
workflow[WavelengthBands] = None


workflow[CorrectForGravity] = True
workflow[UncertaintyBroadcastMode] = UncertaintyBroadcastMode.upper_bound
workflow[ReturnEvents] = False

workflow[QBins] = sc.linspace(dim='Q', start=0.01, stop=0.3, num=101, unit='1/angstrom')
workflow[DirectBeam] = None

## Configuring data to load

We have not configured which files we want to load.
In this tutorial, we use helpers to fetch the tutorial data which return the filenames of the cached files.
In a real use case, you would set these parameters manually:

In [None]:
workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60339()
workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60250()
workflow[Filename[BackgroundRun]] = loki.data.loki_tutorial_background_run_60393()
workflow[Filename[TransmissionRun[SampleRun]]] = (
    loki.data.loki_tutorial_sample_transmission_run()
)
workflow[Filename[TransmissionRun[BackgroundRun]]] = loki.data.loki_tutorial_run_60392()
workflow[Filename[EmptyBeamRun]] = loki.data.loki_tutorial_run_60392()

## Finding the beam center

Looking carefully at the workflow above,
one will notice that there is a missing parameter from the workflow: the red box that contains the `BeamCenter` type.
Before we can proceed with computing the direct beam function,
we therefore have to first determine the center of the beam.

There are more details on how this is done in the [beam center finder notebook](../common/beam-center-finder.ipynb),
but for now we simply reuse the workflow (by making a copy),
and inserting the provider that will compute the beam center.

For now, we compute the beam center only for the rear detector (named 'larmor_detector') but apply it to all banks (currently there is only one bank).
The beam center may need to be computed or applied differently to each bank, see [scipp/esssans#28](https://github.com/scipp/esssans/issues/28).
We use a center-of-mass approach to find the beam center:

In [None]:
center = sans.beam_center_from_center_of_mass(workflow)
center

and set that value in our workflow

In [None]:
workflow[BeamCenter] = center

In [None]:
from ess.reduce.nexus.generic_types import NeXusDetectorEventData, NeXusMonitorEventData

det_events = workflow.compute(NeXusDetectorEventData[SampleRun])
mon_events = workflow.compute(NeXusMonitorEventData[SampleRun, Incident])
det_events

In [None]:
mon_events

In [None]:
%matplotlib widget
dummy = sc.DataArray(
    sc.zeros(dims=('Q',), shape=(100,), with_variances=True),
    coords={'Q': sc.linspace('Q', 0.0, 1.0, 101, unit='1/angstrom')},
)
fig = dummy.plot(norm='log', scale={'Q': 'log'}, vmin=0.1, vmax=5)
fig.canvas.ylabel = '$I(Q)$ ' + fig.canvas.ylabel
artist = next(iter(fig.artists))
display(fig)

In [None]:
from functools import partial
from ess.reduce import streaming

streaming_wf = streaming.StreamProcessor(
    base_workflow=workflow,
    dynamic_keys=(
        NeXusMonitorEventData[SampleRun, Incident],
        NeXusDetectorEventData[SampleRun],
    ),
    accumulation_keys=(
        ReducedQ[SampleRun, Numerator],
        ReducedQ[SampleRun, Denominator],
    ),
    target_keys=(IofQ[SampleRun],),
    # accumulator=streaming.EternalAccumulator,
    accumulator=partial(streaming.RollingAccumulator, window=5),
)


det_stride = 60
mon_stride = 1
for i in range(100):
    det_chunk = det_events[det_stride * i : det_stride * (i + 1)].copy()
    mon_chunk = mon_events[mon_stride * i : mon_stride * (i + 1)].copy()
    if 20 < i < 30:
        det_chunk *= 0.0
    results = streaming_wf.add_chunk(
        {
            NeXusDetectorEventData[SampleRun]: det_chunk,
            NeXusMonitorEventData[SampleRun, Incident]: mon_chunk,
        }
    )
    fig.update({artist: results[IofQ[SampleRun]]})
    fig.fig.canvas.draw()
    fig.fig.canvas.flush_events()

### Debugging files

In [None]:
workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60339()
workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60250()
mon = workflow.compute(MonitorData[SampleRun, Incident]).hist()
mon.plot()

In [None]:
%matplotlib widget
from ess.reduce.nexus.generic_types import NeXusDetectorEventData

det = workflow.compute(NeXusDetectorEventData[SampleRun]).hist()
det.plot()
det.bin(event_time_zero=mon.coords['event_time_zero'][55:]).hist().plot()

In [None]:
t_mon = workflow.compute(MonitorData[SampleRun, Incident]).coords['event_time_zero']
t_det = workflow.compute(NeXusDetectorEventData[SampleRun]).coords['event_time_zero']

In [None]:
t_mon.plot()

In [None]:
t_det.plot()