# File Chunk Workflow
In this example, we will process McStas events chunk by chunk, panel by panel.

This example is to support processing big size files.

See ``workflow`` example about the workflow in detail.

## Build Base Workflow

In [None]:
from ess.nmx.mcstas import McStasWorkflow
from ess.nmx.data import small_mcstas_3_sample
from ess.nmx.reduction import NMXReducedData
from ess.nmx.types import *

wf = McStasWorkflow()
# Replace with the path to your own file
wf[FilePath] = small_mcstas_3_sample()
wf[MaximumCounts] = 10_000
wf[TimeBinSteps] = 50

In [None]:
wf.visualize(NMXReducedData, graph_attr={"rankdir": "TD"}, compact=True)

## Streaming Processor and Accumulator

``StreamingProcessor`` allows processing data chunk by chunk.

``Accumulator`` defines how to keep track of the latest result of the workflow.

In [None]:
from ess.reduce.streaming import EternalAccumulator, Accumulator
from ess.reduce.streaming import StreamProcessor


class MaxAccumulator(Accumulator):
    """Accumulator that keeps track of the maximum value seen so far."""
    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._cur_max: sc.Variable | None = None

    @property
    def value(self) -> sc.Variable | None:
        return self._cur_max

    def _do_push(self, value: sc.Variable) -> None:
        new_max = value.max()
        if self._cur_max is None:
            self._cur_max = new_max
        else:
            self._cur_max = sc.concat([self._cur_max, new_max], dim='max').max('max')


In [None]:
from functools import partial
from ess.nmx.types import RawEventData, ProtonCharge, NMXHistogram, MaximumProbability

# Stream processor building helper
stream_processor = partial(
    StreamProcessor,
    dynamic_keys=(RawEventData,),
    target_keys=(ProtonCharge, NMXHistogram),
    accumulators={
        ProtonCharge: EternalAccumulator,
        NMXHistogram: EternalAccumulator,
        MaximumProbability: MaxAccumulator,
    },
)

## Compute Chunk by Chunk and Export Panel by Panel

Since it is for saving memories, exporting reduced data for each panel cannot be separated from the processing loop.

This example below process the data chunk by chunk with size: ``CHUNK_SIZE``

and export the data into ``FINE_NAME`` panel by panel.

In [None]:
import sciline as sl
from contextlib import contextmanager
from collections.abc import Generator


@contextmanager
def temp_parameter(
    wf: sl.Pipeline, parameter_type: type, value: Any
) -> Generator[sl.Pipeline]:
    """Helper to temporarily change a parameter in a pipeline."""
    copied = wf.copy()
    copied[parameter_type] = value
    yield copied
    del copied

In [None]:
import pathlib
from ess.nmx.types import DetectorBankPrefix, DetectorName, CrystalRotation
from ess.nmx.mcstas.load import raw_event_data_chunks, McStasInstrument
from ess.nmx.reduction import format_nmx_reduced_data
from ess.nmx.nexus import (
    export_panel_dependent_data_as_nxlauetof,
    export_panel_independent_data_as_nxlauetof,
)

FINE_NAME = "test.h5"
CHUNK_SIZE = 100  # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3  # Normally 3, but we only have 1 in the small sample

# Loop over the detectors
for i in range(0, NUM_DETECTORS):
    # Set the detector index and compute the static results first
    with temp_parameter(wf, DetectorIndex, i) as temp_wf:
        static_results = temp_wf.compute(
            [DetectorName, DetectorBankPrefix, CrystalRotation, McStasInstrument]
        )
    mcstas_geo: McStasInstrument = static_results[McStasInstrument]
    prefix: DetectorBankPrefix = static_results[DetectorBankPrefix]
    detector_name: DetectorName = static_results[DetectorName]
    crystal_rotation: CrystalRotation = static_results[CrystalRotation]
    pixel_ids = mcstas_geo.to_coords(detector_name).pop("pixel_id")

    # Build the stream processor
    processor = stream_processor(temp_wf)
    for da in raw_event_data_chunks(
        file_path=FilePath(small_mcstas_3_sample()),
        bank_prefix=prefix,
        pixel_ids=pixel_ids,
        chunk_size=100,
    ):
        if any(da.sizes.values()) == 0:
            continue
        results = processor.add_chunk({RawEventData: da})

    proton_charge, histogram = results[ProtonCharge], results[NMXHistogram]
    reduced = format_nmx_reduced_data(
        counts=histogram,
        proton_charge=proton_charge,
        detector_name=detector_name,
        instrument=mcstas_geo,
        crystal_rotation=crystal_rotation,
    )
    # Replace the time bin steps with the first set of time bins
    # This might exclude some time bins if the first set is not complete
    # Better to set it manually as a parameter but it is not possible
    # to enforce the user to set it as an array when using the streaming processor
    wf[TimeBinSteps] = reduced['counts'].coords['t']
    # Export each panel to the same file
    if not pathlib.Path(FINE_NAME).exists() or i == 0:
        export_panel_independent_data_as_nxlauetof(reduced, output_file=FINE_NAME)
    export_panel_dependent_data_as_nxlauetof(reduced, output_file=FINE_NAME)
    del reduced
