diff --git a/pyproject.toml b/pyproject.toml index fd9c800..b1fda2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "pandas", "gemmi", "defusedxml", + "bitshuffle", ] dynamic = ["version"] diff --git a/src/ess/nmx/nexus.py b/src/ess/nmx/nexus.py index b241893..0e65eeb 100644 --- a/src/ess/nmx/nexus.py +++ b/src/ess/nmx/nexus.py @@ -7,6 +7,7 @@ from functools import partial from typing import Any, TypeVar +import bitshuffle.h5 import h5py import numpy as np import sciline as sl @@ -34,6 +35,7 @@ def _create_dataset_from_var( long_name: str | None = None, compression: str | None = None, compression_opts: int | None = None, + chunks: tuple[int, ...] | int | bool | None = None, dtype: Any = None, ) -> h5py.Dataset: compression_options = {} @@ -45,6 +47,7 @@ def _create_dataset_from_var( dataset = root_entry.create_dataset( name, data=var.values if dtype is None else var.values.astype(dtype, copy=False), + chunks=chunks, **compression_options, ) if var.unit is not None: @@ -56,9 +59,20 @@ def _create_dataset_from_var( _create_compressed_dataset = partial( _create_dataset_from_var, - compression="gzip", - compression_opts=4, + compression=bitshuffle.h5.H5FILTER, + compression_opts=(0, bitshuffle.h5.H5_COMPRESS_LZ4), ) +"""Create dataset with compression options. + +[``Bitshuffle/LZ4``](https://github.com/kiyo-masui/bitshuffle) is used for convenience. +Since ``Dectris`` uses it for their Nexus file compression, it is compatible with DIALS. +``Bitshuffle/LZ4`` tends to give similar results to +GZIP and other compression algorithms with better performance. +A naive implementation of bitshuffle/LZ4 compression, +shown in [issue #124](https://github.com/scipp/essnmx/issues/124), +led to 80% file reduction (365 MB vs 1.8 GB). + +""" def _create_root_data_entry(file_obj: h5py.File) -> h5py.Group: @@ -393,7 +407,9 @@ def _export_detector_metadata_as_nxlauetof( def _export_reduced_data_as_nxlauetof( dg: NMXReducedDataGroup, output_file: str | pathlib.Path | io.BytesIO, + *, append_mode: bool = True, + compress_counts: bool = True, ) -> None: """Export the reduced data to a NeXus file with the LAUE_TOF application definition. @@ -413,6 +429,9 @@ def _export_reduced_data_as_nxlauetof( If ``False``, the file is opened in None-append mode. > None-append mode is not supported for now. > Only append mode is supported for now. + compress_counts: + If ``True``, the detector counts are compressed using bitshuffle. + It is because only the detector counts are expected to be large. """ if not append_mode: @@ -424,12 +443,25 @@ def _export_reduced_data_as_nxlauetof( # The actual application definition defines it as integer, # but we keep the original data type for now num_x, num_y = dg["detector_shape"].value # Probably better way to do this - data_dset = _create_dataset_from_var( - name="data", - root_entry=nx_detector, - var=sc.fold(dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y}), - dtype=np.uint, - ) + if compress_counts: + data_dset = _create_compressed_dataset( + name="data", + root_entry=nx_detector, + var=sc.fold( + dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y} + ), + chunks=(num_x, num_y, 1), + dtype=np.uint, + ) + else: + data_dset = _create_dataset_from_var( + name="data", + root_entry=nx_detector, + var=sc.fold( + dg['counts'].data, dim='id', sizes={'x': num_x, 'y': num_y} + ), + dtype=np.uint, + ) data_dset.attrs["signal"] = 1 _create_dataset_from_var( name='time_of_flight', @@ -462,12 +494,14 @@ def __init__( chunk_generator: Callable[[FilePath, DetectorName], Generator[T, None, None]], chunk_insert_key: type[T], extra_meta: dict[str, sc.Variable] | None = None, + compress_counts: bool = True, overwrite: bool = False, ) -> None: from ess.reduce.streaming import EternalAccumulator, StreamProcessor from .types import FilePath, NMXReducedCounts + self.compress_counts = compress_counts self._chunk_generator = chunk_generator self._chunk_insert_key = chunk_insert_key self._workflow = workflow @@ -516,6 +550,8 @@ def add_panel(self, *, detector_id: DetectorIndex | DetectorName) -> None: results = processor.add_chunk({self._chunk_insert_key: da}) _export_reduced_data_as_nxlauetof( - results[NMXReducedDataGroup], self._output_filename + results[NMXReducedDataGroup], + self._output_filename, + compress_counts=self.compress_counts, ) return results[NMXReducedDataGroup]