diff --git a/src/ess/reduce/live/raw.py b/src/ess/reduce/live/raw.py index ef3c0624..f2f15fbd 100644 --- a/src/ess/reduce/live/raw.py +++ b/src/ess/reduce/live/raw.py @@ -21,7 +21,7 @@ from __future__ import annotations from collections.abc import Callable, Sequence -from math import ceil +from math import ceil, prod from typing import Literal, NewType import numpy as np @@ -138,6 +138,139 @@ def apply_full(self, var: sc.Variable) -> sc.DataArray: return self._hist(replicated, coords=self._coords) / self.replicas +class LogicalView: + """ + Logical view for detector data. + + Implements a view by applying a user-defined transform (e.g., fold or slice + operations) optionally followed by reduction (summing) over specified dimensions. + Transformation and reduction must be specified separately for `LogicalView` to + construct a mapping from output indices to input indices. So `transform` must not + perform any reductions. + + This class provides both data transformation (__call__) and index mapping + (input_indices) using the same transform, ensuring consistency for ROI filtering. + """ + + def __init__( + self, + transform: Callable[[sc.DataArray], sc.DataArray], + reduction_dim: str | list[str] | None = None, + input_sizes: dict[str, int] | None = None, + ): + """ + Create a logical view. + + Parameters + ---------- + transform: + Callable that transforms input data by reshaping or selecting pixels. + Examples: + - Fold: lambda da: da.fold('x', {'x': 512, 'x_bin': 2}) + - Slice: lambda da: da['z', 0] (select front layer of volume) + - Combined: lambda da: da.fold('x', {'x': 4, 'z': 8})['z', 0] + reduction_dim: + Dimension(s) to sum over after applying transform. If None or empty, + no reduction is performed (pure transform). + Example: 'x_bin' or ['x_bin', 'y_bin'] + input_sizes: + Dictionary defining the input dimension sizes. + Required for input_indices(). + If not provided, input_indices() will raise an error. + When used with RollingDetectorView, this is automatically + inferred from detector_number. + """ + self._transform = transform + if reduction_dim is None: + self._reduction_dim = [] + elif isinstance(reduction_dim, str): + self._reduction_dim = [reduction_dim] + else: + self._reduction_dim = reduction_dim + self._input_sizes = input_sizes + + @property + def replicas(self) -> int: + """Number of replicas. Always 1 for LogicalView.""" + return 1 + + def __call__(self, da: sc.DataArray) -> sc.DataArray: + """ + Apply transform and optionally sum over reduction dimensions. + + Parameters + ---------- + da: + Data to downsample. + + Returns + ------- + : + Transformed (and optionally reduced) data array. + """ + transformed = self._transform(da) + if self._reduction_dim: + return transformed.sum(self._reduction_dim) + return transformed + + def input_indices(self) -> sc.DataArray: + """ + Create index mapping for ROI filtering. + + Returns a DataArray mapping output pixels to input indices (as indices into + the flattened input array). If reduction dimensions are specified, returns + binned data where each output pixel contains a list of contributing input + indices. If no reduction, returns dense indices (1:1 mapping). + + Returns + ------- + : + DataArray mapping output pixels to input indices. + + Raises + ------ + ValueError: + If input_sizes was not provided during initialization. + """ + if self._input_sizes is None: + raise ValueError( + "input_sizes is required for input_indices(). " + "Provide it during LogicalView initialization." + ) + + # Create sequential indices (0, 1, 2, ...) and fold to input shape + total_size = prod(self._input_sizes.values()) + indices = sc.arange('_temp', total_size, dtype='int64', unit=None) + indices = indices.fold(dim='_temp', sizes=self._input_sizes) + + # Apply transform to get the grouping structure + transformed = self._transform(sc.DataArray(data=indices)) + + if not self._reduction_dim: + # No reduction: 1:1 mapping, return dense indices + return sc.DataArray(data=transformed.data) + + # Flatten reduction dimensions to a single dimension. + # First transpose to make reduction dims contiguous at the end. + output_dims = [d for d in transformed.dims if d not in self._reduction_dim] + transformed = transformed.transpose(output_dims + self._reduction_dim) + transformed = transformed.flatten(dims=self._reduction_dim, to='_reduction') + + # Convert dense array to binned structure where each output pixel + # contains a bin with the indices of contributing input pixels. + bin_size = transformed.sizes['_reduction'] + output_shape = [transformed.sizes[d] for d in output_dims] + data_flat = transformed.data.flatten(to='_flat') + begin = sc.array( + dims=output_dims, + values=np.arange(0, data_flat.size, bin_size, dtype=np.int64).reshape( + output_shape + ), + unit=None, + ) + return sc.DataArray(sc.bins(begin=begin, dim='_flat', data=data_flat)) + + class Detector: def __init__(self, detector_number: sc.Variable): self._data = sc.DataArray( @@ -220,6 +353,48 @@ def __init__( self._cumulative: sc.DataArray self.clear_counts() + @staticmethod + def with_logical_view( + *, + detector_number: sc.Variable, + window: int, + transform: Callable[[sc.DataArray], sc.DataArray], + reduction_dim: str | list[str] | None = None, + ) -> RollingDetectorView: + """ + Create a RollingDetectorView with a LogicalView projection. + + This factory method creates a LogicalView with input_sizes + automatically inferred from detector_number.sizes. + + Parameters + ---------- + detector_number: + Detector number for each pixel. + window: + Size of the rolling window. + transform: + Transform function for the LogicalView. + reduction_dim: + Reduction dimension(s) for the LogicalView. If None or empty, + no reduction is performed (pure transform). + + Returns + ------- + : + RollingDetectorView with LogicalView projection. + """ + view = LogicalView( + transform=transform, + reduction_dim=reduction_dim, + input_sizes=dict(detector_number.sizes), + ) + return RollingDetectorView( + detector_number=detector_number, + window=window, + projection=view, + ) + @property def max_window(self) -> int: return self._window @@ -249,9 +424,11 @@ def clear_counts(self) -> None: def make_roi_filter(self) -> roi.ROIFilter: """Return a ROI filter operating via the projection plane of the view.""" norm = 1.0 - if isinstance(self._projection, Histogrammer): + # Use duck typing: check if projection has input_indices method + if hasattr(self._projection, 'input_indices'): indices = self._projection.input_indices() - norm = self._projection.replicas + # Get replicas property if it exists (Histogrammer has it, default to 1.0) + norm = getattr(self._projection, 'replicas', 1.0) else: indices = sc.ones(sizes=self.data.sizes, dtype='int32', unit=None) indices = sc.cumsum(indices, mode='exclusive') @@ -292,10 +469,11 @@ def transform_weights( if not sc.identical(det_num, self.detector_number): raise sc.CoordError("Mismatching detector numbers in weights.") weights = weights.data - if isinstance(self._projection, Histogrammer): + # Use duck typing: check for apply_full method (Histogrammer) + if hasattr(self._projection, 'apply_full'): xs = self._projection.apply_full(weights) # Use all replicas elif self._projection is not None: - xs = self._projection(weights) + xs = self._projection(weights) # LogicalDownsampler or callable else: xs = weights.copy() nonempty = xs.values[xs.values > 0] diff --git a/tests/live/raw_test.py b/tests/live/raw_test.py index 90773c1c..e82f072f 100644 --- a/tests/live/raw_test.py +++ b/tests/live/raw_test.py @@ -547,3 +547,377 @@ def test_transform_weights_raises_given_DataArray_with_bad_det_num() -> None: ) with pytest.raises(sc.CoordError): view.transform_weights(weights) + + +class TestLogicalView: + """Tests for LogicalView class.""" + + def test_single_dim_downsampling(self) -> None: + """Test basic 1D downsampling with transform + reduction.""" + + # Transform: fold into 4 groups of 2 + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2} + ) + + view = raw.LogicalView( + transform=transform, + reduction_dim='x_bin', + ) + + # Create test data: each pixel has value equal to its index + data = sc.DataArray( + data=sc.arange('x_pixel_offset', 8, dtype='float64', unit='counts') + ) + + # Apply downsampling + result = view(data) + + # Should sum pairs: [0+1, 2+3, 4+5, 6+7] = [1, 5, 9, 13] + expected = sc.array( + dims=['x_pixel_offset'], + values=[1.0, 5.0, 9.0, 13.0], + unit='counts', + ) + assert sc.allclose(result.data, expected) + assert result.sizes == {'x_pixel_offset': 4} + + def test_multi_dim_downsampling(self) -> None: + """Test 2D downsampling similar to _resize_image example.""" + + # Transform: fold both dimensions for 8x8 -> 4x4 (2x2 binning in each dimension) + def transform(da: sc.DataArray) -> sc.DataArray: + da = da.fold(dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2}) + da = da.fold(dim='y_pixel_offset', sizes={'y_pixel_offset': 4, 'y_bin': 2}) + return da + + view = raw.LogicalView( + transform=transform, + reduction_dim=['x_bin', 'y_bin'], + ) + + # Create test data: constant value of 1 everywhere + data = sc.DataArray( + data=sc.ones( + sizes={'x_pixel_offset': 8, 'y_pixel_offset': 8}, unit='counts' + ) + ) + + # Apply downsampling + result = view(data) + + # Each output pixel should be sum of 2x2=4 input pixels + expected = sc.full( + dims=['x_pixel_offset', 'y_pixel_offset'], + shape=[4, 4], + value=4.0, + unit='counts', + ) + assert sc.allclose(result.data, expected) + assert result.sizes == {'x_pixel_offset': 4, 'y_pixel_offset': 4} + + def test_input_indices_single_dim(self) -> None: + """Test that input_indices creates correct binned mapping for 1D.""" + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2} + ) + + view = raw.LogicalView( + transform=transform, + reduction_dim='x_bin', + input_sizes={'x_pixel_offset': 8}, + ) + + # Get index mapping + indices = view.input_indices() + + # Should be binned data with 4 bins, each containing 2 indices + assert indices.sizes == {'x_pixel_offset': 4} + assert indices.bins is not None + + # Check each bin contains the correct indices + # Bin 0: [0, 1], Bin 1: [2, 3], Bin 2: [4, 5], Bin 3: [6, 7] + # Extract all bin contents using the bins accessor + bin_sizes = indices.bins.size() + assert all(bin_sizes.values == 2) # Each bin should have 2 indices + + # Check total count + assert indices.bins.size().sum().value == 8 + + def test_input_indices_multi_dim(self) -> None: + """Test that input_indices creates correct binned mapping for 2D.""" + + def transform(da: sc.DataArray) -> sc.DataArray: + da = da.fold(dim='x_pixel_offset', sizes={'x_pixel_offset': 2, 'x_bin': 2}) + da = da.fold(dim='y_pixel_offset', sizes={'y_pixel_offset': 2, 'y_bin': 2}) + return da + + view = raw.LogicalView( + transform=transform, + reduction_dim=['x_bin', 'y_bin'], + input_sizes={'x_pixel_offset': 4, 'y_pixel_offset': 4}, + ) + + # Get index mapping + indices = view.input_indices() + + # Should be binned data with 2x2 output bins + assert indices.sizes == {'x_pixel_offset': 2, 'y_pixel_offset': 2} + assert indices.bins is not None + + # Each bin should contain 2x2=4 indices from the flattened input + bin_sizes = indices.bins.size() + assert all(bin_sizes.values.ravel() == 4) # Each bin should have 4 indices + + # Check total count: 4x4 input pixels -> 2x2 output bins + assert indices.bins.size().sum().value == 16 + + def test_with_varying_input_values(self) -> None: + """Test that downsampling correctly sums varying input values.""" + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 3, 'x_bin': 2} + ) + + view = raw.LogicalView( + transform=transform, + reduction_dim='x_bin', + ) + + # Create test data with specific values: [10, 20, 30, 40, 50, 60] + data = sc.DataArray( + data=sc.array( + dims=['x_pixel_offset'], + values=[10.0, 20.0, 30.0, 40.0, 50.0, 60.0], + unit='counts', + ) + ) + + result = view(data) + + # Should sum pairs: [10+20, 30+40, 50+60] = [30, 70, 110] + expected = sc.array( + dims=['x_pixel_offset'], + values=[30.0, 70.0, 110.0], + unit='counts', + ) + assert sc.allclose(result.data, expected) + + def test_transform_without_reduction_slicing(self) -> None: + """Test transform without reduction (slicing to select front layer).""" + + # Transform: fold to 3D volume, then slice front layer + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='voxel', sizes={'x': 2, 'y': 2, 'z': 3})['z', 0] + + view = raw.LogicalView(transform=transform) + + # Create test data: each voxel has value equal to its index + data = sc.DataArray(data=sc.arange('voxel', 12, dtype='float64', unit='counts')) + + result = view(data) + + # fold orders: z is innermost, so z=0 gives every 3rd element starting at 0 + # indices: 0, 3, 6, 9 + assert result.sizes == {'x': 2, 'y': 2} + expected = sc.array( + dims=['x', 'y'], + values=[[0.0, 3.0], [6.0, 9.0]], + unit='counts', + ) + assert sc.allclose(result.data, expected) + + def test_transform_without_reduction_reshape(self) -> None: + """Test transform without reduction (pure reshape).""" + + # Transform: just fold without any reduction + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='pixel', sizes={'x': 3, 'y': 4}) + + view = raw.LogicalView(transform=transform) + + data = sc.DataArray(data=sc.arange('pixel', 12, dtype='float64', unit='counts')) + + result = view(data) + + # Should just reshape, no reduction + assert result.sizes == {'x': 3, 'y': 4} + assert result.data.sum().value == 66.0 # Sum of 0..11 + + def test_input_indices_without_reduction(self) -> None: + """Test that input_indices returns dense indices when no reduction.""" + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='voxel', sizes={'x': 2, 'y': 2, 'z': 3})['z', 0] + + view = raw.LogicalView( + transform=transform, + input_sizes={'voxel': 12}, + ) + + indices = view.input_indices() + + # Should be dense (not binned) - 1:1 mapping + assert indices.bins is None + assert indices.sizes == {'x': 2, 'y': 2} + + # Indices should correspond to front layer of folded volume + # fold orders: z is innermost, so z=0 gives every 3rd index starting at 0 + expected = sc.array(dims=['x', 'y'], values=[[0, 3], [6, 9]], unit=None) + assert sc.identical(indices.data, expected) + + def test_input_indices_without_reduction_preserves_total_count(self) -> None: + """Test that non-reducing input_indices has correct number of indices.""" + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='pixel', sizes={'x': 4, 'y': 5}) + + view = raw.LogicalView( + transform=transform, + input_sizes={'pixel': 20}, + ) + + indices = view.input_indices() + + # Dense indices should have same total size as output shape + assert indices.sizes == {'x': 4, 'y': 5} + assert indices.data.size == 20 + + +class TestRollingDetectorViewWithLogicalView: + """Tests for RollingDetectorView integration with LogicalView.""" + + def test_as_projection(self) -> None: + """Test that RollingDetectorView works with LogicalView as projection.""" + # Create a 1D detector with 8 pixels + detector_number = sc.arange('x_pixel_offset', 1, 9, unit=None) + + # Define downsampling transform: 8 -> 4 pixels (2x binning) + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2} + ) + + # Create RollingDetectorView with LogicalView using factory method + view = raw.RollingDetectorView.with_logical_view( + detector_number=detector_number, + window=2, + transform=transform, + reduction_dim='x_bin', + ) + + # Add some counts: pixels 1, 2, 3, 4 -> downsampled bins [0, 1] + view.add_counts([1, 2, 3, 4]) + result = view.get() + + # After downsampling: [1+2, 3+4] = [2, 2] for first two bins + assert result.sizes == {'x_pixel_offset': 4} + assert result['x_pixel_offset', 0].value == 2 + assert result['x_pixel_offset', 1].value == 2 + assert result['x_pixel_offset', 2].value == 0 + assert result['x_pixel_offset', 3].value == 0 + + def test_make_roi_filter(self) -> None: + """Test that make_roi_filter() works with LogicalView.""" + detector_number = sc.arange('x_pixel_offset', 1, 9, unit=None) + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2} + ) + + view = raw.RollingDetectorView.with_logical_view( + detector_number=detector_number, + window=1, + transform=transform, + reduction_dim='x_bin', + ) + + # Should not raise - LogicalView has input_indices() + roi_filter = view.make_roi_filter() + + # The indices should be binned data (check via private attribute for now) + assert roi_filter._indices.bins is not None + assert roi_filter._indices.sizes == {'x_pixel_offset': 4} + # Each bin should contain 2 indices + assert all(roi_filter._indices.bins.size().values == 2) + + def test_transform_weights(self) -> None: + """Test that transform_weights() works with LogicalView.""" + detector_number = sc.arange('x_pixel_offset', 1, 9, unit=None) + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold( + dim='x_pixel_offset', sizes={'x_pixel_offset': 4, 'x_bin': 2} + ) + + view = raw.RollingDetectorView.with_logical_view( + detector_number=detector_number, + window=1, + transform=transform, + reduction_dim='x_bin', + ) + + # Create weights: all pixels have weight 1.0 + weights = sc.ones(sizes={'x_pixel_offset': 8}, dtype='float32', unit='') + + # Transform weights through the downsampler + transformed = view.transform_weights(weights) + + # After downsampling: each output bin sums 2 input weights = 2.0 + assert transformed.sizes == {'x_pixel_offset': 4} + expected = sc.full( + dims=['x_pixel_offset'], shape=[4], value=2.0, dtype='float32', unit='' + ) + assert sc.allclose(transformed.data, expected) + + def test_with_non_reducing_view(self) -> None: + """Test RollingDetectorView with LogicalView without reduction (slicing).""" + # 12 voxels that will be folded into 2x2x3 and sliced to front layer + detector_number = sc.arange('voxel', 1, 13, unit=None) + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='voxel', sizes={'x': 2, 'y': 2, 'z': 3})['z', 0] + + view = raw.RollingDetectorView.with_logical_view( + detector_number=detector_number, + window=2, + transform=transform, + # No reduction_dim - pure transform + ) + + # Add counts for detector_numbers that map to front layer (z=0) + # z is innermost, so front layer indices are 0, 3, 6, 9 (every 3rd) + # detector_numbers are 1-indexed, so front layer det_nums are 1, 4, 7, 10 + view.add_counts([1, 4, 7, 10]) + result = view.get() + + assert result.sizes == {'x': 2, 'y': 2} + # Each front-layer pixel gets one count + expected = sc.array( + dims=['x', 'y'], values=[[1, 1], [1, 1]], dtype='int32', unit='counts' + ) + assert sc.identical(result.data, expected) + + def test_make_roi_filter_with_non_reducing_view(self) -> None: + """Test make_roi_filter with non-reducing LogicalView returns dense indices.""" + detector_number = sc.arange('voxel', 1, 13, unit=None) + + def transform(da: sc.DataArray) -> sc.DataArray: + return da.fold(dim='voxel', sizes={'x': 2, 'y': 2, 'z': 3})['z', 0] + + view = raw.RollingDetectorView.with_logical_view( + detector_number=detector_number, + window=1, + transform=transform, + ) + + roi_filter = view.make_roi_filter() + + # Indices should be dense (not binned) for non-reducing view + assert roi_filter._indices.bins is None + assert roi_filter._indices.sizes == {'x': 2, 'y': 2}