diff --git a/pyproject.toml b/pyproject.toml index 15486a27..b1fda2b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,10 +42,14 @@ dependencies = [ "pandas", "gemmi", "defusedxml", + "bitshuffle", ] dynamic = ["version"] +[project.scripts] +essnmx_reduce_mcstas = "ess.nmx.mcstas.executables:main" + [project.optional-dependencies] test = [ "pytest", diff --git a/src/ess/nmx/mcstas/__init__.py b/src/ess/nmx/mcstas/__init__.py index 9680faf6..3026dc20 100644 --- a/src/ess/nmx/mcstas/__init__.py +++ b/src/ess/nmx/mcstas/__init__.py @@ -1,5 +1,8 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +from ..types import MaximumCounts + +default_parameters = {MaximumCounts: 10000} def McStasWorkflow(): @@ -27,5 +30,6 @@ def McStasWorkflow(): reduce_raw_event_probability, raw_event_probability_to_counts, format_nmx_reduced_data, - ) + ), + params=default_parameters, ) diff --git a/src/ess/nmx/mcstas/executables.py b/src/ess/nmx/mcstas/executables.py new file mode 100644 index 00000000..4e68d44e --- /dev/null +++ b/src/ess/nmx/mcstas/executables.py @@ -0,0 +1,276 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +import argparse +import logging +import pathlib +import sys +from collections.abc import Callable +from functools import partial + +import sciline as sl +import scipp as sc + +from ess.reduce.streaming import ( + EternalAccumulator, + MaxAccumulator, + MinAccumulator, + StreamProcessor, +) + +from ..nexus import ( + _export_detector_metadata_as_nxlauetof, + _export_reduced_data_as_nxlauetof, + _export_static_metadata_as_nxlauetof, +) +from ..streaming import calculate_number_of_chunks +from ..types import ( + DetectorIndex, + DetectorName, + FilePath, + MaximumCounts, + MaximumProbability, + MaximumTimeOfArrival, + McStasWeight2CountScaleFactor, + MinimumTimeOfArrival, + NMXDetectorMetadata, + NMXExperimentMetadata, + NMXRawDataMetadata, + NMXReducedCounts, + NMXReducedDataGroup, + PixelIds, + RawEventProbability, + TimeBinSteps, +) +from . import McStasWorkflow +from .load import ( + mcstas_weight_to_probability_scalefactor, + raw_event_data_chunk_generator, +) +from .xml import McStasInstrument + + +def _build_metadata_streaming_processor_helper() -> ( + Callable[[sl.Pipeline], StreamProcessor] +): + return partial( + StreamProcessor, + dynamic_keys=(RawEventProbability,), + target_keys=(NMXRawDataMetadata,), + accumulators={ + MaximumProbability: MaxAccumulator, + MaximumTimeOfArrival: MaxAccumulator, + MinimumTimeOfArrival: MinAccumulator, + }, + ) + + +def _build_final_streaming_processor_helper() -> ( + Callable[[sl.Pipeline], StreamProcessor] +): + return partial( + StreamProcessor, + dynamic_keys=(RawEventProbability,), + target_keys=(NMXReducedDataGroup,), + accumulators={NMXReducedCounts: EternalAccumulator}, + ) + + +def calculate_raw_data_metadata( + *detector_ids: DetectorIndex | DetectorName, + wf: sl.Pipeline, + chunk_size: int = 10_000_000, + logger: logging.Logger | None = None, +) -> NMXRawDataMetadata: + # Stream processor building helper + scalefactor_stream_processor = _build_metadata_streaming_processor_helper() + metadata_wf = wf.copy() + # Loop over the detectors + file_path = metadata_wf.compute(FilePath) + raw_data_metadatas = {} + + for detector_i in detector_ids: + temp_wf = metadata_wf.copy() + if isinstance(detector_i, str): + temp_wf[DetectorName] = detector_i + else: + temp_wf[DetectorIndex] = detector_i + + detector_name = temp_wf.compute(DetectorName) + max_chunk_id = calculate_number_of_chunks( + temp_wf.compute(FilePath), + detector_name=detector_name, + chunk_size=chunk_size, + ) + # Build the stream processor + processor = scalefactor_stream_processor(temp_wf) + for i_da, da in enumerate( + raw_event_data_chunk_generator( + file_path=file_path, detector_name=detector_name, chunk_size=chunk_size + ) + ): + if any(da.sizes.values()) == 0: + continue + else: + results = processor.add_chunk({RawEventProbability: da}) + if logger is not None: + logger.info( + "[{%s}/{%s}] Processed chunk for {%s}", + i_da + 1, + max_chunk_id, + detector_name, + ) + + raw_data_metadatas[detector_i] = results[NMXRawDataMetadata] + + # We take the min/maximum values of the scale factor + # We are doing it manually because it is not possible to update parameters + # in the workflow that stream processor uses. + min_toa = min(dg.min_toa for dg in raw_data_metadatas.values()) + max_toa = max(dg.max_toa for dg in raw_data_metadatas.values()) + max_probability = max(dg.max_probability for dg in raw_data_metadatas.values()) + + return NMXRawDataMetadata( + min_toa=min_toa, max_toa=max_toa, max_probability=max_probability + ) + + +def reduction( + *, + input_file: pathlib.Path, + output_file: pathlib.Path, + chunk_size: int = 10_000_000, + detector_ids: list[int | str], + wf: sl.Pipeline | None = None, + logger: logging.Logger | None = None, +) -> None: + wf = wf.copy() if wf is not None else McStasWorkflow() + wf[FilePath] = input_file + # Set static info + wf[McStasInstrument] = wf.compute(McStasInstrument) + + # Calculate parameters for data reduction + data_metadata = calculate_raw_data_metadata( + *detector_ids, wf=wf, logger=logger, chunk_size=chunk_size + ) + if logger is not None: + logger.info("Metadata retrieved: %s", data_metadata) + + toa_bin_edges = sc.linspace( + dim='t', start=data_metadata.min_toa, stop=data_metadata.max_toa, num=51 + ) + scale_factor = mcstas_weight_to_probability_scalefactor( + max_counts=wf.compute(MaximumCounts), + max_probability=data_metadata.max_probability, + ) + # Compute metadata and make the skeleton output file + experiment_metadata = wf.compute(NMXExperimentMetadata) + detector_metas = [] + for detector_i in range(3): + temp_wf = wf.copy() + temp_wf[DetectorIndex] = detector_i + detector_metas.append(temp_wf.compute(NMXDetectorMetadata)) + + if logger is not None: + logger.info("Exporting metadata into the output file %s", output_file) + + _export_static_metadata_as_nxlauetof( + experiment_metadata=experiment_metadata, + output_file=output_file, + # Arbitrary metadata falls into ``entry`` group as a variable. + mcstas_weight2count_scale_factor=scale_factor, + ) + _export_detector_metadata_as_nxlauetof(*detector_metas, output_file=output_file) + # Compute histogram + final_wf = wf.copy() + # Set the scale factor and time bin edges + final_wf[McStasWeight2CountScaleFactor] = scale_factor + final_wf[TimeBinSteps] = toa_bin_edges + + file_path = final_wf.compute(FilePath) + final_stream_processor = _build_final_streaming_processor_helper() + # Loop over the detectors + for detector_i in detector_ids: + temp_wf = final_wf.copy() + if isinstance(detector_i, str): + temp_wf[DetectorName] = detector_i + else: + temp_wf[DetectorIndex] = detector_i + # Set static information as parameters + detector_name = temp_wf.compute(DetectorName) + temp_wf[PixelIds] = temp_wf.compute(PixelIds) + max_chunk_id = calculate_number_of_chunks( + file_path, detector_name=detector_name, chunk_size=chunk_size + ) + + # Build the stream processor + processor = final_stream_processor(temp_wf) + for i_da, da in enumerate( + raw_event_data_chunk_generator( + file_path=file_path, detector_name=detector_name, chunk_size=chunk_size + ) + ): + if any(da.sizes.values()) == 0: + continue + else: + results = processor.add_chunk({RawEventProbability: da}) + if logger is not None: + logger.info( + "[{%s}/{%s}] Processed chunk for {%s}", + i_da + 1, + max_chunk_id, + detector_name, + ) + + result = results[NMXReducedDataGroup] + if logger is not None: + logger.info("Appending reduced data into the output file %s", output_file) + _export_reduced_data_as_nxlauetof(result, output_file=output_file) + + +def main() -> None: + parser = argparse.ArgumentParser(description="McStas Data Reduction.") + parser.add_argument( + "--input_file", type=str, help="Path to the input file", required=True + ) + parser.add_argument( + "--output_file", + type=str, + default="scipp_output.h5", + help="Path to the output file", + ) + parser.add_argument( + "--verbose", action="store_true", help="Increase output verbosity" + ) + parser.add_argument( + "--chunk_size", + type=int, + default=10_000_000, + help="Chunk size for processing", + ) + parser.add_argument( + "--detector_ids", + type=int, + nargs="+", + default=[0, 1, 2], + help="Detector indices to process", + ) + + args = parser.parse_args() + + input_file = pathlib.Path(args.input_file).resolve() + output_file = pathlib.Path(args.output_file).resolve() + + logger = logging.getLogger(__name__) + if args.verbose: + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler(sys.stdout)) + + wf = McStasWorkflow() + reduction( + input_file=input_file, + output_file=output_file, + chunk_size=args.chunk_size, + detector_ids=args.detector_ids, + logger=logger, + wf=wf, + ) diff --git a/src/ess/nmx/mcstas/load.py b/src/ess/nmx/mcstas/load.py index b989d092..186e671b 100644 --- a/src/ess/nmx/mcstas/load.py +++ b/src/ess/nmx/mcstas/load.py @@ -49,13 +49,10 @@ def load_event_data_bank_name( def _exclude_zero_events(data: sc.Variable) -> sc.Variable: """Exclude events with zero counts from the data. - McStas can add an extra event line containing 0,0,0,0,0,0 - This line should not be included so we skip it. + McStas can add extra event lines containing 0,0,0,0,0,0 + These lines should not be included so we skip it. """ - if (data.values[0] == 0).all(): - data = data["event", 1:] - else: - data = data + data = data[(data != sc.scalar(0.0, unit=data.unit)).any(dim="dim_1")] return data diff --git a/src/ess/nmx/mtz_io.py b/src/ess/nmx/mtz_io.py index fbfb50e8..87ee79ec 100644 --- a/src/ess/nmx/mtz_io.py +++ b/src/ess/nmx/mtz_io.py @@ -17,7 +17,7 @@ """Path to the mtz file""" SpaceGroupDesc = NewType("SpaceGroupDesc", str) """The space group description. e.g. 'P 21 21 21'""" -DEFAULT_SPACE_GROUP_DESC = SpaceGroupDesc("P 21 21 21") +DEFAULT_SPACE_GROUP_DESC = SpaceGroupDesc("P 1") """The default space group description to use if not found in the mtz files.""" # Custom column names diff --git a/src/ess/nmx/nexus.py b/src/ess/nmx/nexus.py index f07bbec2..0e65eeb3 100644 --- a/src/ess/nmx/nexus.py +++ b/src/ess/nmx/nexus.py @@ -7,6 +7,7 @@ from functools import partial from typing import Any, TypeVar +import bitshuffle.h5 import h5py import numpy as np import sciline as sl @@ -34,6 +35,7 @@ def _create_dataset_from_var( long_name: str | None = None, compression: str | None = None, compression_opts: int | None = None, + chunks: tuple[int, ...] | int | bool | None = None, dtype: Any = None, ) -> h5py.Dataset: compression_options = {} @@ -45,6 +47,7 @@ def _create_dataset_from_var( dataset = root_entry.create_dataset( name, data=var.values if dtype is None else var.values.astype(dtype, copy=False), + chunks=chunks, **compression_options, ) if var.unit is not None: @@ -56,9 +59,20 @@ def _create_dataset_from_var( _create_compressed_dataset = partial( _create_dataset_from_var, - compression="gzip", - compression_opts=4, + compression=bitshuffle.h5.H5FILTER, + compression_opts=(0, bitshuffle.h5.H5_COMPRESS_LZ4), ) +"""Create dataset with compression options. + +[``Bitshuffle/LZ4``](https://github.com/kiyo-masui/bitshuffle) is used for convenience. +Since ``Dectris`` uses it for their Nexus file compression, it is compatible with DIALS. +``Bitshuffle/LZ4`` tends to give similar results to +GZIP and other compression algorithms with better performance. +A naive implementation of bitshuffle/LZ4 compression, +shown in [issue #124](https://github.com/scipp/essnmx/issues/124), +led to 80% file reduction (365 MB vs 1.8 GB). + +""" def _create_root_data_entry(file_obj: h5py.File) -> h5py.Group: @@ -243,6 +257,12 @@ def _add_lauetof_detector_group(dg: sc.DataGroup, nx_instrument: h5py.Group) -> def _add_lauetof_sample_group(dg: NMXExperimentMetadata, nx_entry: h5py.Group) -> None: nx_sample = nx_entry.create_group("sample") nx_sample.attrs["NX_class"] = "NXsample" + _create_dataset_from_var( + root_entry=nx_sample, + var=dg['crystal_rotation'], + name='crystal_rotation', + long_name='crystal rotation in Phi (XYZ)', + ) _create_dataset_from_string( root_entry=nx_sample, name='name', @@ -387,7 +407,9 @@ def _export_detector_metadata_as_nxlauetof( def _export_reduced_data_as_nxlauetof( dg: NMXReducedDataGroup, output_file: str | pathlib.Path | io.BytesIO, + *, append_mode: bool = True, + compress_counts: bool = True, ) -> None: """Export the reduced data to a NeXus file with the LAUE_TOF application definition. @@ -407,6 +429,9 @@ def _export_reduced_data_as_nxlauetof( If ``False``, the file is opened in None-append mode. > None-append mode is not supported for now. > Only append mode is supported for now. + compress_counts: + If ``True``, the detector counts are compressed using bitshuffle. + It is because only the detector counts are expected to be large. """ if not append_mode: @@ -418,12 +443,25 @@ def _export_reduced_data_as_nxlauetof( # The actual application definition defines it as integer, # but we keep the original data type for now num_x, num_y = dg["detector_shape"].value # Probably better way to do this - data_dset = _create_dataset_from_var( - name="data", - root_entry=nx_detector, - var=sc.fold(dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y}), - dtype=np.uint, - ) + if compress_counts: + data_dset = _create_compressed_dataset( + name="data", + root_entry=nx_detector, + var=sc.fold( + dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y} + ), + chunks=(num_x, num_y, 1), + dtype=np.uint, + ) + else: + data_dset = _create_dataset_from_var( + name="data", + root_entry=nx_detector, + var=sc.fold( + dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y} + ), + dtype=np.uint, + ) data_dset.attrs["signal"] = 1 _create_dataset_from_var( name='time_of_flight', @@ -456,12 +494,14 @@ def __init__( chunk_generator: Callable[[FilePath, DetectorName], Generator[T, None, None]], chunk_insert_key: type[T], extra_meta: dict[str, sc.Variable] | None = None, + compress_counts: bool = True, overwrite: bool = False, ) -> None: from ess.reduce.streaming import EternalAccumulator, StreamProcessor from .types import FilePath, NMXReducedCounts + self.compress_counts = compress_counts self._chunk_generator = chunk_generator self._chunk_insert_key = chunk_insert_key self._workflow = workflow @@ -510,6 +550,8 @@ def add_panel(self, *, detector_id: DetectorIndex | DetectorName) -> None: results = processor.add_chunk({self._chunk_insert_key: da}) _export_reduced_data_as_nxlauetof( - results[NMXReducedDataGroup], self._output_filename + results[NMXReducedDataGroup], + self._output_filename, + compress_counts=self.compress_counts, ) return results[NMXReducedDataGroup] diff --git a/tests/mtz_io_test.py b/tests/mtz_io_test.py index 33981359..e585cbb6 100644 --- a/tests/mtz_io_test.py +++ b/tests/mtz_io_test.py @@ -9,7 +9,7 @@ from ess.nmx import mtz_io from ess.nmx.data import get_small_mtz_samples from ess.nmx.mtz_io import ( - DEFAULT_SPACE_GROUP_DESC, # P 21 21 21 + DEFAULT_SPACE_GROUP_DESC, # P 1 MtzDataFrame, MTZFileIndex, MTZFilePath, @@ -75,7 +75,7 @@ def mtz_list() -> list[gemmi.Mtz]: def test_get_space_group_with_spacegroup_desc() -> None: assert ( mtz_io.get_space_group_from_description(DEFAULT_SPACE_GROUP_DESC).short_name() - == "P212121" + == "P1" ) @@ -96,7 +96,7 @@ def conflicting_mtz_series( def test_get_unique_space_group_raises_on_conflict( conflicting_mtz_series: list[gemmi.Mtz], ) -> None: - reg = r"Multiple space groups found:.+P 21 21 21.+C 1 2 1" + reg = r"Multiple space groups found:.+P 1.+C 1 2 1" space_groups = [ mtz_io.get_space_group_from_mtz(mtz) for mtz in conflicting_mtz_series ]