Skip to content
188 changes: 183 additions & 5 deletions src/ess/reduce/live/raw.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider adapting https://github.com/scipp/essdiffraction/blob/35310a285ba0c325def3aa3ae5795915cc10db19/src/ess/dream/diagnostics.py#L19 ? This would be able to show all voxels / pixels without user-defined projections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is too large for streaming and imho not useful, since finding failing components in a view with >1 Mpixel is not trivial. Also I am not sure how it relates to ROIs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since finding failing components in a view with >1 Mpixel is not trivial.

True. But when you combine pixels, failures can affect many display pixels and are combined with noise from working components. So seeing a failing pixel can be tricky, too.

Also I am not sure how it relates to ROIs?

I don't think it does unless the pixels are ordered in a meaningful way.

What kinds of ROIs is this meant for? Is this about data reduction or about diagnostics? It seems like the former because the projections you made for dream are about physical location of voxels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not data reduction. This PR is unrelated to the DREAM PR. This is mainly for things such as ODIN, TBL, or NMX which display a 2D image detector (with our without downsampling).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since finding failing components in a view with >1 Mpixel is not trivial.

True. But when you combine pixels, failures can affect many display pixels and are combined with noise from working components. So seeing a failing pixel can be tricky, too.

ROI support is for displaying TOF spectra for the selected ROI. This is not for seeing failing pixels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to select an actual region or a single (output) pixel? The latter could be done with a flat voxel view as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A region. And we are not doing flat voxel views in live reduction.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from __future__ import annotations

from collections.abc import Callable, Sequence
from math import ceil
from math import ceil, prod
from typing import Literal, NewType

import numpy as np
Expand Down Expand Up @@ -138,6 +138,139 @@ def apply_full(self, var: sc.Variable) -> sc.DataArray:
return self._hist(replicated, coords=self._coords) / self.replicas


class LogicalView:
"""
Logical view for detector data.

Implements a view by applying a user-defined transform (e.g., fold or slice
operations) optionally followed by reduction (summing) over specified dimensions.
Transformation and reduction must be specified separately for `LogicalView` to
construct a mapping from output indices to input indices. So `transform` must not
perform any reductions.

This class provides both data transformation (__call__) and index mapping
(input_indices) using the same transform, ensuring consistency for ROI filtering.
"""

def __init__(
self,
transform: Callable[[sc.DataArray], sc.DataArray],
reduction_dim: str | list[str] | None = None,
input_sizes: dict[str, int] | None = None,
):
"""
Create a logical view.

Parameters
----------
transform:
Callable that transforms input data by reshaping or selecting pixels.
Examples:
- Fold: lambda da: da.fold('x', {'x': 512, 'x_bin': 2})
- Slice: lambda da: da['z', 0] (select front layer of volume)
- Combined: lambda da: da.fold('x', {'x': 4, 'z': 8})['z', 0]
reduction_dim:
Dimension(s) to sum over after applying transform. If None or empty,
no reduction is performed (pure transform).
Example: 'x_bin' or ['x_bin', 'y_bin']
input_sizes:
Dictionary defining the input dimension sizes.
Required for input_indices().
If not provided, input_indices() will raise an error.
When used with RollingDetectorView, this is automatically
inferred from detector_number.
"""
self._transform = transform
if reduction_dim is None:
self._reduction_dim = []
elif isinstance(reduction_dim, str):
self._reduction_dim = [reduction_dim]
else:
self._reduction_dim = reduction_dim
self._input_sizes = input_sizes

@property
def replicas(self) -> int:
"""Number of replicas. Always 1 for LogicalView."""
return 1

def __call__(self, da: sc.DataArray) -> sc.DataArray:
"""
Apply transform and optionally sum over reduction dimensions.

Parameters
----------
da:
Data to downsample.

Returns
-------
:
Transformed (and optionally reduced) data array.
"""
transformed = self._transform(da)
if self._reduction_dim:
return transformed.sum(self._reduction_dim)
return transformed

def input_indices(self) -> sc.DataArray:
"""
Create index mapping for ROI filtering.

Returns a DataArray mapping output pixels to input indices (as indices into
the flattened input array). If reduction dimensions are specified, returns
binned data where each output pixel contains a list of contributing input
indices. If no reduction, returns dense indices (1:1 mapping).

Returns
-------
:
DataArray mapping output pixels to input indices.

Raises
------
ValueError:
If input_sizes was not provided during initialization.
"""
if self._input_sizes is None:
raise ValueError(
"input_sizes is required for input_indices(). "
"Provide it during LogicalView initialization."
)

# Create sequential indices (0, 1, 2, ...) and fold to input shape
total_size = prod(self._input_sizes.values())
indices = sc.arange('_temp', total_size, dtype='int64', unit=None)
indices = indices.fold(dim='_temp', sizes=self._input_sizes)

# Apply transform to get the grouping structure
transformed = self._transform(sc.DataArray(data=indices))

if not self._reduction_dim:
# No reduction: 1:1 mapping, return dense indices
return sc.DataArray(data=transformed.data)

# Flatten reduction dimensions to a single dimension.
# First transpose to make reduction dims contiguous at the end.
output_dims = [d for d in transformed.dims if d not in self._reduction_dim]
transformed = transformed.transpose(output_dims + self._reduction_dim)
transformed = transformed.flatten(dims=self._reduction_dim, to='_reduction')

# Convert dense array to binned structure where each output pixel
# contains a bin with the indices of contributing input pixels.
bin_size = transformed.sizes['_reduction']
output_shape = [transformed.sizes[d] for d in output_dims]
data_flat = transformed.data.flatten(to='_flat')
begin = sc.array(
dims=output_dims,
values=np.arange(0, data_flat.size, bin_size, dtype=np.int64).reshape(
output_shape
),
unit=None,
)
return sc.DataArray(sc.bins(begin=begin, dim='_flat', data=data_flat))


class Detector:
def __init__(self, detector_number: sc.Variable):
self._data = sc.DataArray(
Expand Down Expand Up @@ -220,6 +353,48 @@ def __init__(
self._cumulative: sc.DataArray
self.clear_counts()

@staticmethod
def with_logical_view(
*,
detector_number: sc.Variable,
window: int,
transform: Callable[[sc.DataArray], sc.DataArray],
reduction_dim: str | list[str] | None = None,
) -> RollingDetectorView:
"""
Create a RollingDetectorView with a LogicalView projection.

This factory method creates a LogicalView with input_sizes
automatically inferred from detector_number.sizes.

Parameters
----------
detector_number:
Detector number for each pixel.
window:
Size of the rolling window.
transform:
Transform function for the LogicalView.
reduction_dim:
Reduction dimension(s) for the LogicalView. If None or empty,
no reduction is performed (pure transform).

Returns
-------
:
RollingDetectorView with LogicalView projection.
"""
view = LogicalView(
transform=transform,
reduction_dim=reduction_dim,
input_sizes=dict(detector_number.sizes),
)
return RollingDetectorView(
detector_number=detector_number,
window=window,
projection=view,
)

@property
def max_window(self) -> int:
return self._window
Expand Down Expand Up @@ -249,9 +424,11 @@ def clear_counts(self) -> None:
def make_roi_filter(self) -> roi.ROIFilter:
"""Return a ROI filter operating via the projection plane of the view."""
norm = 1.0
if isinstance(self._projection, Histogrammer):
# Use duck typing: check if projection has input_indices method
if hasattr(self._projection, 'input_indices'):
indices = self._projection.input_indices()
norm = self._projection.replicas
# Get replicas property if it exists (Histogrammer has it, default to 1.0)
norm = getattr(self._projection, 'replicas', 1.0)
else:
indices = sc.ones(sizes=self.data.sizes, dtype='int32', unit=None)
indices = sc.cumsum(indices, mode='exclusive')
Expand Down Expand Up @@ -292,10 +469,11 @@ def transform_weights(
if not sc.identical(det_num, self.detector_number):
raise sc.CoordError("Mismatching detector numbers in weights.")
weights = weights.data
if isinstance(self._projection, Histogrammer):
# Use duck typing: check for apply_full method (Histogrammer)
if hasattr(self._projection, 'apply_full'):
xs = self._projection.apply_full(weights) # Use all replicas
elif self._projection is not None:
xs = self._projection(weights)
xs = self._projection(weights) # LogicalDownsampler or callable
else:
xs = weights.copy()
nonempty = xs.values[xs.values > 0]
Expand Down
Loading
Loading