From fe5ad845bf6a48aca906b569eaf38567c29f4f86 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Fri, 6 Mar 2026 15:18:26 +0100 Subject: [PATCH 01/16] Clean up params for zarr writing --- src/annbatch/io.py | 188 ++++++++++++++++++++++++--------------- tests/conftest.py | 12 +-- tests/test_preshuffle.py | 105 ++++++++++++---------- 3 files changed, 178 insertions(+), 127 deletions(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index f8501f32..1ceed512 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -59,14 +59,55 @@ def _round_down(num: int, divisor: int): return num - (num % divisor) +def _parse_size_to_bytes(size: str) -> int: + """Parse a human-readable size string (e.g., '1GB', '512MB') to bytes.""" + SIZE_UNITS = {"GB": 1024**3, "MB": 1024**2, "KB": 1024, "B": 1} + + size = size.strip().upper() + for unit, multiplier in SIZE_UNITS.items(): + if size.endswith(unit): + return int(float(size[: -len(unit)]) * multiplier) + raise ValueError(f"Cannot parse size string: {size!r}. Expected units: {', '.join(SIZE_UNITS)}") + + +def _resolve_shard_obs(shard_size: int | str, elem, iospec: ad.experimental.IOSpec) -> int: + """Convert *shard_size* to an observation count for a single array element. + + If *shard_size* is already an int it is returned as-is. When it is a + size string the target byte budget is divided by the element's + uncompressed bytes-per-observation-row. + """ + if isinstance(shard_size, int): + return shard_size + target_bytes = _parse_size_to_bytes(shard_size) + if iospec.encoding_type in {"array"}: + bytes_per_row = math.prod(elem.shape[1:], start=elem.dtype.itemsize) + elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: + n_obs = elem.shape[0] + if n_obs == 0: + return 1 + bytes_per_row = (elem.data.nbytes + elem.indices.nbytes + elem.indptr.nbytes) / n_obs + elif iospec.encoding_type == "coo_matrix": + n_obs = elem.shape[0] + if n_obs == 0: + return 1 + bytes_per_row = (elem.data.nbytes + elem.row.nbytes + elem.col.nbytes) / n_obs + elif iospec.encoding_type == "dataframe": + n_rows = len(elem) + if n_rows == 0: + return 1 + bytes_per_row = sum(elem[col].nbytes for col in elem.columns) / n_rows + else: + return 1 + return max(1, int(target_bytes / bytes_per_row)) if bytes_per_row > 0 else 1 + + def write_sharded( group: zarr.Group, adata: ad.AnnData, *, - sparse_chunk_size: int = 32768, - sparse_shard_size: int = 134_217_728, - dense_chunk_size: int = 1024, - dense_shard_size: int = 4194304, + chunk_size: int = 64, + shard_size: int | str = 2_097_152, compressors: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), key: str | None = None, ): @@ -78,14 +119,14 @@ def write_sharded( The destination group, must be zarr v3 adata The source anndata object - sparse_chunk_size - Chunk size of `indices` and `data` inside a shard. - sparse_shard_size - Shard size i.e., number of elements in a single sparse `data` or `indices` file. - dense_chunk_size - Number of obs elements per dense chunk along the first axis - dense_shard_size - Number of obs elements per dense shard along the first axis + chunk_size + Number of observations per chunk. For dense arrays this directly sets the first-axis chunk size. + For sparse arrays it is converted to element counts using the average non-zero elements per row of the matrix being written. + shard_size + Number of observations per shard, or a size string (e.g. ``'1GB'``, ``'512MB'``). + If a size string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. + For dense arrays the resolved count directly sets the first-axis shard size. + For sparse arrays it is converted to element counts using the average non-zero elements per row of the matrix being written. compressors The compressors to pass to `zarr`. key @@ -102,30 +143,38 @@ def callback( *, iospec: ad.experimental.IOSpec, ): - # Ensure we're not overriding anything here + # Ensure we're not overriding anything here. dataset_kwargs = dataset_kwargs.copy() + elem_shard_size = _resolve_shard_obs(shard_size, elem, iospec) if iospec.encoding_type in {"array"} and ( any(n in store.name for n in {"obsm", "layers", "obsp"}) or "X" == elem_name ): # Get either the desired size or the next multiple down to ensure divisibility of chunks and shards - shard_size = min(dense_shard_size, _round_down(elem.shape[0], dense_chunk_size)) - chunk_size = min(dense_chunk_size, _round_down(elem.shape[0], dense_chunk_size)) - # If the shape is less than the computed size (impossible given rounds?) or the rounding caused created a 0-size chunk, then error - if elem.shape[0] < chunk_size or chunk_size == 0: + dense_chunk = min(chunk_size, _round_down(elem.shape[0], chunk_size)) + if elem.shape[0] < dense_chunk or dense_chunk == 0: raise ValueError( - f"Choose a dense shard obs {dense_shard_size} and chunk obs {dense_chunk_size} with non-zero size less than the number of observations {elem.shape[0]}" + f"Choose a shard obs {shard_size} and chunk obs {chunk_size} with non-zero size less than the number of observations {elem.shape[0]}" ) + dense_shard = min(elem_shard_size, _round_down(elem.shape[0], chunk_size)) + dense_shard = max(dense_chunk, _round_down(dense_shard, dense_chunk)) dataset_kwargs = { **dataset_kwargs, - "shards": (shard_size,) + elem.shape[1:], # only shard over 1st dim - "chunks": (chunk_size,) + elem.shape[1:], # only chunk over 1st dim + "shards": (dense_shard,) + elem.shape[1:], # only shard over 1st dim + "chunks": (dense_chunk,) + elem.shape[1:], # only chunk over 1st dim "compressors": compressors, } elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: + nnz = elem.nnz + avg_nnz = nnz / elem.shape[0] if elem.shape[0] > 0 else 1.0 + sparse_chunk = max(1, int(chunk_size * avg_nnz)) + sparse_shard = max(1, int(elem_shard_size * avg_nnz)) + sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard + sparse_chunk = min(sparse_chunk, sparse_shard) + sparse_shard = _round_down(sparse_shard, sparse_chunk) dataset_kwargs = { **dataset_kwargs, - "shards": (sparse_shard_size,), - "chunks": (sparse_chunk_size,), + "shards": (sparse_shard,), + "chunks": (sparse_chunk,), "compressors": compressors, } write_func(store, elem_name, elem, dataset_kwargs=dataset_kwargs) @@ -399,10 +448,8 @@ def add_adatas( *, load_adata: Callable[[zarr.Group | h5py.Group | PathLike[str] | str], ad.AnnData] = _default_load_adata, var_subset: Iterable[str] | None = None, - zarr_sparse_chunk_size: int = 32768, - zarr_sparse_shard_size: int = 134_217_728, - zarr_dense_chunk_size: int = 1024, - zarr_dense_shard_size: int = 4_194_304, + zarr_chunk_size: int = 64, + zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", n_obs_per_dataset: int = 2_097_152, @@ -433,14 +480,13 @@ def add_adatas( var_subset Subset of gene names to include in the store. If None, all genes are included. Genes are subset based on the `var_names` attribute of the concatenated AnnData object. - zarr_sparse_chunk_size - Size of the chunks to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_sparse_shard_size - Size of the shards to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_dense_chunk_size - Number of observations per dense zarr chunk i.e., sharding is only done along the first axis of the array. - zarr_dense_shard_size - Number of observations per dense zarr shard i.e., chunking is only done along the first axis of the array. + zarr_chunk_size + Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. + For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. + zarr_shard_size + Number of observations per zarr shard, or a size string (e.g. ``'1GB'``). + If a size string is provided, the number of obersevations per zarr shard is estimated automatically. + For sparse arrays the number of observations is converted to element counts using the average number of non-zero elements per row of the matrix being written zarr_compressor Compressors to use to compress the data in the zarr store. h5ad_compressor @@ -448,7 +494,8 @@ def add_adatas( n_obs_per_dataset Number of observations to load into memory at once for shuffling / pre-processing. The higher this number, the more memory is used, but the better the shuffling. - This corresponds to the size of the shards created. + This corresponds to the size of the dataset level shards created. + Only applicable when adding datasets for the first time, otherwise ignored. Only applicable when adding datasets for the first time, otherwise ignored. shuffle Whether to shuffle the data before writing it to the store. @@ -482,16 +529,16 @@ def add_adatas( ...) """ if shuffle_chunk_size > n_obs_per_dataset: - raise ValueError("Cannot have a large slice size than observations per dataset") + raise ValueError( + "Cannot have a larger slice size than observations per dataset. Reduce `shuffle_chunk_size` or increase `n_obs_per_dataset`." + ) if rng is None: rng = np.random.default_rng() shared_kwargs = { "adata_paths": adata_paths, "load_adata": load_adata, - "zarr_sparse_chunk_size": zarr_sparse_chunk_size, - "zarr_sparse_shard_size": zarr_sparse_shard_size, - "zarr_dense_chunk_size": zarr_dense_chunk_size, - "zarr_dense_shard_size": zarr_dense_shard_size, + "zarr_chunk_size": zarr_chunk_size, + "zarr_shard_size": zarr_shard_size, "zarr_compressor": zarr_compressor, "h5ad_compressor": h5ad_compressor, "shuffle_chunk_size": shuffle_chunk_size, @@ -510,10 +557,8 @@ def _create_collection( adata_paths: Iterable[PathLike[str]] | Iterable[str], load_adata: Callable[[PathLike[str] | str], ad.AnnData] = _default_load_adata, var_subset: Iterable[str] | None = None, - zarr_sparse_chunk_size: int = 32768, - zarr_sparse_shard_size: int = 134_217_728, - zarr_dense_chunk_size: int = 1024, - zarr_dense_shard_size: int = 4_194_304, + zarr_chunk_size: int = 64, + zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", n_obs_per_dataset: int = 2_097_152, @@ -543,14 +588,13 @@ def _create_collection( Subset of gene names to include in the store. If None, all genes are included. Genes are subset based on the `var_names` attribute of the concatenated AnnData object. Only applicable when adding datasets for the first time, otherwise ignored and the incoming data's var space is subsetted to that of the existing collection. - zarr_sparse_chunk_size - Size of the chunks to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_sparse_shard_size - Size of the shards to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_dense_chunk_size - Number of observations per dense zarr chunk i.e., sharding is only done along the first axis of the array. - zarr_dense_shard_size - Number of observations per dense zarr shard i.e., chunking is only done along the first axis of the array. + zarr_chunk_size + Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. + For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. + zarr_shard_size + Number of observations per zarr shard, or a size string (e.g. ``'1GB'``). + If a size string is provided, the number of obersevations per zarr shard is estimated automatically. + For sparse arrays the number of observations is converted to element counts using the average number of non-zero elements per row of the matrix being written zarr_compressor Compressors to use to compress the data in the zarr store. h5ad_compressor @@ -570,6 +614,11 @@ def _create_collection( """ if not self.is_empty: raise RuntimeError("Cannot create a collection at a location that already has a shuffled collection") + if shuffle_chunk_size > n_obs_per_dataset: + raise ValueError( + "Cannot have a larger slice size than observations per dataset. Reduce `shuffle_chunk_size` or increase `n_obs_per_dataset`." + ) + _check_for_mismatched_keys(adata_paths, load_adata=load_adata) adata_concat = _lazy_load_anndatas(adata_paths, load_adata=load_adata) adata_concat.obs_names_make_unique() @@ -584,6 +633,7 @@ def _create_collection( if var_subset is None: var_subset = adata_concat.var_names + for i, chunk in enumerate(tqdm(chunks, desc="processing chunks")): var_mask = adata_concat.var_names.isin(var_subset) # np.sort: It's more efficient to access elements sequentially from dask arrays @@ -598,10 +648,8 @@ def _create_collection( write_sharded( self._group, adata_chunk, - sparse_chunk_size=zarr_sparse_chunk_size, - sparse_shard_size=zarr_sparse_shard_size, - dense_chunk_size=min(adata_chunk.shape[0], zarr_dense_chunk_size), - dense_shard_size=min(adata_chunk.shape[0], zarr_dense_shard_size), + chunk_size=zarr_chunk_size, + shard_size=zarr_shard_size, compressors=zarr_compressor, key=f"{DATASET_PREFIX}_{i}", ) @@ -619,10 +667,8 @@ def _add_to_collection( *, adata_paths: Iterable[PathLike[str]] | Iterable[str], load_adata: Callable[[PathLike[str] | str], ad.AnnData] = ad.read_h5ad, - zarr_sparse_chunk_size: int = 32768, - zarr_sparse_shard_size: int = 134_217_728, - zarr_dense_chunk_size: int = 1024, - zarr_dense_shard_size: int = 4_194_304, + zarr_chunk_size: int = 64, + zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", shuffle_chunk_size: int = 1000, @@ -644,14 +690,13 @@ def _add_to_collection( If you only need a subset of the input anndata files' elems (e.g., only `X` and `obs`), you can provide a custom function here to speed up loading and harmonize your data. The input to the function is a path to an anndata file, and the output is an anndata object. If the input data is too large to fit into memory, you should use :func:`annndata.experimental.read_lazy` instead. - zarr_sparse_chunk_size - Size of the chunks to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_sparse_shard_size - Size of the shards to use for the `indices` and `data` of a sparse matrix in the zarr store. - zarr_dense_chunk_size - Number of observations per dense zarr chunk i.e., sharding is only done along the first axis of the array. - zarr_dense_shard_size - Number of observations per dense zarr shard i.e., chunking is only done along the first axis of the array. + zarr_chunk_size + Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. + For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. + zarr_shard_size + Number of observations per zarr shard, or a size string (e.g. ``'1GB'``). + If a size string is provided, the number of obersevations per zarr shard is estimated automatically. + For sparse arrays the number of observations is converted to element counts using the average number of non-zero elements per row of the matrix being written zarr_compressor Compressors to use to compress the data in the zarr store. should_sparsify_output_in_memory @@ -666,7 +711,6 @@ def _add_to_collection( raise ValueError("Store is empty. Please run `DatasetCollection.add` first.") # Check for mismatched keys among the inputs. _check_for_mismatched_keys(adata_paths, load_adata=load_adata) - adata_concat = _lazy_load_anndatas(adata_paths, load_adata=load_adata) if math.ceil(adata_concat.shape[0] / shuffle_chunk_size) < len(self._dataset_keys): raise ValueError( @@ -701,10 +745,8 @@ def _add_to_collection( write_sharded( self._group, adata, - sparse_chunk_size=zarr_sparse_chunk_size, - sparse_shard_size=zarr_sparse_shard_size, - dense_chunk_size=min(adata.shape[0], zarr_dense_chunk_size), - dense_shard_size=min(adata.shape[0], zarr_dense_shard_size), + chunk_size=zarr_chunk_size, + shard_size=zarr_shard_size, compressors=zarr_compressor, key=dataset, ) diff --git a/tests/conftest.py b/tests/conftest.py index 45be4996..2da2c0df 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,10 +51,8 @@ def adata_with_zarr_path_same_var_space(tmpdir_factory, n_shards: int = 3) -> Ge write_sharded( f, adata, - sparse_chunk_size=10, - sparse_shard_size=20, - dense_chunk_size=10, - dense_shard_size=20, + chunk_size=10, + shard_size=20, ) yield ( # need to match directory iteration order for correctness so can't just concatenate @@ -116,10 +114,8 @@ def simple_collection( output_path = Path(tmpdir_factory.mktemp("zarr_folder")) / "simple_fixture.zarr" collection = DatasetCollection(output_path).add_adatas( zarr_stores, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=10, - zarr_dense_shard_size=20, + zarr_chunk_size=10, + zarr_shard_size=20, n_obs_per_dataset=60, shuffle_chunk_size=10, ) diff --git a/tests/test_preshuffle.py b/tests/test_preshuffle.py index da06723d..d52a0da5 100644 --- a/tests/test_preshuffle.py +++ b/tests/test_preshuffle.py @@ -24,8 +24,8 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") - with pytest.raises(ValueError, match=r"Choose a dense"): - write_sharded(z, adata, dense_chunk_size=20) + with pytest.raises(ValueError, match=r"Choose a shard obs"): + write_sharded(z, adata, chunk_size=20) @pytest.mark.parametrize( @@ -35,7 +35,7 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): def test_write_sharded_shard_size_too_big(tmp_path: Path, chunk_size: int, expected_shard_size: int): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") - write_sharded(z, adata, dense_chunk_size=chunk_size, dense_shard_size=20) + write_sharded(z, adata, chunk_size=chunk_size, shard_size=20) assert z["X"].shards == (expected_shard_size, 20) # i.e., the closest multiple to `dense_chunk_size` @@ -53,10 +53,8 @@ def test_store_creation_warnings_with_different_keys(elem_name: Literal["obsm", with pytest.warns(UserWarning, match=rf"Found {elem_name} keys.* not present in all anndatas"): DatasetCollection(tmp_path / "collection.zarr").add_adatas( [path_1, path_2], - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=10, ) @@ -71,10 +69,8 @@ def test_store_creation_no_warnings_with_custom_load(tmp_path: Path): adata_2.write_h5ad(path_2) collection = DatasetCollection(tmp_path / "collection.zarr").add_adatas( [path_1, path_2], - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=5, load_adata=lambda x: ad.AnnData(X=ad.io.read_elem(h5py.File(x)["X"])), @@ -93,10 +89,8 @@ def test_store_creation_path_added_to_obs(tmp_path: Path): output_dir = tmp_path / "path_src_collection.zarr" collection = DatasetCollection(output_dir).add_adatas( paths, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=5, shuffle=False, @@ -122,10 +116,8 @@ def test_store_addition_different_keys( collection = DatasetCollection(output_path) collection.add_adatas( [orig_path], - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=10, - zarr_dense_shard_size=20, + zarr_chunk_size=10, + zarr_shard_size=20, n_obs_per_dataset=50, shuffle_chunk_size=10, ) @@ -139,10 +131,8 @@ def test_store_addition_different_keys( collection.add_adatas( [additional_path], load_adata=load_adata, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, shuffle_chunk_size=2, ) @@ -204,10 +194,8 @@ def test_store_creation( collection = DatasetCollection(output_path).add_adatas( [adata_with_h5_path_different_var_space[1] / f for f in h5_files if str(f).endswith(".h5ad")], var_subset=var_subset, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, n_obs_per_dataset=50, shuffle_chunk_size=10, shuffle=shuffle, @@ -258,9 +246,9 @@ def test_store_creation( pd.testing.assert_frame_equal(adata.obs, adata_orig.obs) z = zarr.open(output_path / "dataset_0") - # assert chunk behavior + # assert chunk behavior (unified zarr_chunk_size=5 for both sparse and dense) assert z["obsm"]["arr"].chunks[0] == 5, z["obsm"]["arr"] - assert z["X"]["indices"].chunks[0] == 10 + # sparse indices use obs-based chunk; exact element count depends on per-dataset avg_nnz # ensure proper downcasting assert z["X"]["indices"].dtype == (np.uint16 if adata.X.shape[1] >= 256 else np.uint8) @@ -294,10 +282,8 @@ def test_mismatched_raw_concat( h5_paths = [adata_with_h5_path_different_var_space[1] / f for f in h5_files if str(f).endswith(".h5ad")] collection = DatasetCollection(output_path).add_adatas( h5_paths, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=10, - zarr_dense_shard_size=20, + zarr_chunk_size=10, + zarr_shard_size=20, n_obs_per_dataset=30, shuffle_chunk_size=10, shuffle=False, # don't shuffle -> want to check if the right attributes get taken @@ -339,10 +325,8 @@ def test_store_extension( collection = DatasetCollection(store_path) collection.add_adatas( original, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=10, - zarr_dense_shard_size=20, + zarr_chunk_size=10, + zarr_shard_size=20, n_obs_per_dataset=60, shuffle_chunk_size=10, shuffle=True, @@ -351,10 +335,8 @@ def test_store_extension( collection.add_adatas( additional, load_adata=load_adata, - zarr_sparse_chunk_size=10, - zarr_sparse_shard_size=20, - zarr_dense_chunk_size=5, - zarr_dense_shard_size=10, + zarr_chunk_size=5, + zarr_shard_size=10, n_obs_per_dataset=50, shuffle_chunk_size=10, ) @@ -370,7 +352,8 @@ def test_store_extension( assert "arr" in adata.obsm z = zarr.open(store_path / "dataset_0") assert z["obsm"]["arr"].chunks == (5, z["obsm"]["arr"].shape[1]) - assert z["X"]["indices"].chunks[0] == 10 + # Can't directly check sparse chunk size as it depends on the number of non-zero elements per row + assert z["X"]["indices"].chunks[0] == z["X"]["data"].chunks[0] def test_empty(tmp_path: Path): @@ -388,10 +371,8 @@ def test_collection_rng_reproducibility(adata_with_zarr_path_same_var_space: tup zarr_stores = sorted(adata_with_zarr_path_same_var_space[1].glob("*.zarr")) seed = 42 kwargs = { - "zarr_sparse_chunk_size": 10, - "zarr_sparse_shard_size": 20, - "zarr_dense_chunk_size": 10, - "zarr_dense_shard_size": 20, + "zarr_chunk_size": 10, + "zarr_shard_size": 20, "n_obs_per_dataset": 200, "shuffle_chunk_size": 10, "shuffle": True, @@ -405,3 +386,35 @@ def _make_collection(name: str) -> DatasetCollection: for g1, g2 in zip(_make_collection("a.zarr"), _make_collection("b.zarr"), strict=True): pd.testing.assert_frame_equal(ad.io.read_elem(g1).obs, ad.io.read_elem(g2).obs) + + +def test_string_size_params_end_to_end(tmp_path: Path): + """String-based size parameters work end-to-end with sparse data.""" + n_obs, n_vars = 50, 20 + X = sp.random(n_obs, n_vars, density=0.3, format="csr", dtype=np.float32, random_state=42) + obsm = {"embedding": np.random.default_rng(42).standard_normal((n_obs, 10), dtype=np.float32)} + path = tmp_path / "sparse.h5ad" + ad.AnnData(X=X, obsm=obsm).write_h5ad(path, compression=None) + + target_shard_size = "1KB" + output = tmp_path / "collection.zarr" + collection = DatasetCollection(output).add_adatas( + [path], + zarr_chunk_size=10, + zarr_shard_size=target_shard_size, + zarr_compressor=(), + shuffle_chunk_size=10, + shuffle=False, + ) + + assert not collection.is_empty + assert len(list(collection)) == 1 + adata_result = ad.io.read_elem(next(iter(collection))) + assert adata_result.shape == (n_obs, n_vars) + + dataset_grp = next(iter(collection)) + dataset_dir = output / dataset_grp.name.lstrip("/") + shard_files = [p for c_dir in dataset_dir.rglob("c") if c_dir.is_dir() for p in c_dir.rglob("*") if p.is_file()] + assert len(shard_files) > 0 + for sf in shard_files: + assert sf.stat().st_size <= 1024, f"{sf.relative_to(dataset_dir)} is {sf.stat().st_size}B, expected <= 1KB" From 99bc83ffe3946dc6d9faaf8f32c658972e22b2b9 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Fri, 6 Mar 2026 16:00:25 +0100 Subject: [PATCH 02/16] Update src/annbatch/io.py Co-authored-by: Ilan Gold --- src/annbatch/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 1ceed512..b587074b 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -119,7 +119,7 @@ def write_sharded( The destination group, must be zarr v3 adata The source anndata object - chunk_size + obs_per_chunk Number of observations per chunk. For dense arrays this directly sets the first-axis chunk size. For sparse arrays it is converted to element counts using the average non-zero elements per row of the matrix being written. shard_size From d979d5d84e0618d3eb3854be4e106454ac6c5af9 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Fri, 6 Mar 2026 16:16:56 +0100 Subject: [PATCH 03/16] Update size calulation + size parsing --- pyproject.toml | 1 + src/annbatch/io.py | 58 ++++++++++++++++++++-------------------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b9679925..bb976920 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ dependencies = [ "anndata[lazy]>=0.12.9", "dask>=2025.9", + "humanfriendly>=10", "pandas>=2.2.2", "scipy>1.15", # for debug logging (referenced from the issue template) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 1ceed512..61f58456 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -15,8 +15,10 @@ import pandas as pd import scipy.sparse as sp import zarr +from anndata._core.sparse_dataset import BaseCompressedSparseDataset from anndata.experimental.backed import Dataset2D from dask.array.core import Array as DaskArray +from humanfriendly import parse_size from tqdm.auto import tqdm from zarr.codecs import BloscCodec, BloscShuffle @@ -59,18 +61,7 @@ def _round_down(num: int, divisor: int): return num - (num % divisor) -def _parse_size_to_bytes(size: str) -> int: - """Parse a human-readable size string (e.g., '1GB', '512MB') to bytes.""" - SIZE_UNITS = {"GB": 1024**3, "MB": 1024**2, "KB": 1024, "B": 1} - - size = size.strip().upper() - for unit, multiplier in SIZE_UNITS.items(): - if size.endswith(unit): - return int(float(size[: -len(unit)]) * multiplier) - raise ValueError(f"Cannot parse size string: {size!r}. Expected units: {', '.join(SIZE_UNITS)}") - - -def _resolve_shard_obs(shard_size: int | str, elem, iospec: ad.experimental.IOSpec) -> int: +def _resolve_shard_obs(shard_size: int | str, elem) -> int: """Convert *shard_size* to an observation count for a single array element. If *shard_size* is already an int it is returned as-is. When it is a @@ -79,26 +70,25 @@ def _resolve_shard_obs(shard_size: int | str, elem, iospec: ad.experimental.IOSp """ if isinstance(shard_size, int): return shard_size - target_bytes = _parse_size_to_bytes(shard_size) - if iospec.encoding_type in {"array"}: - bytes_per_row = math.prod(elem.shape[1:], start=elem.dtype.itemsize) - elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: - n_obs = elem.shape[0] - if n_obs == 0: - return 1 - bytes_per_row = (elem.data.nbytes + elem.indices.nbytes + elem.indptr.nbytes) / n_obs - elif iospec.encoding_type == "coo_matrix": - n_obs = elem.shape[0] - if n_obs == 0: - return 1 - bytes_per_row = (elem.data.nbytes + elem.row.nbytes + elem.col.nbytes) / n_obs - elif iospec.encoding_type == "dataframe": - n_rows = len(elem) - if n_rows == 0: - return 1 - bytes_per_row = sum(elem[col].nbytes for col in elem.columns) / n_rows - else: + target_bytes = parse_size(shard_size, binary=True) + + def _cs_bytes(x) -> int: + return int(x.data.nbytes + x.indptr.nbytes + x.indices.nbytes) + + n_obs = elem.shape[0] if hasattr(elem, "shape") else len(elem) + if n_obs == 0: return 1 + + if isinstance(elem, h5py.Dataset): + total_bytes = int(np.array(elem.shape).prod() * elem.dtype.itemsize) + elif isinstance(elem, BaseCompressedSparseDataset): + total_bytes = _cs_bytes(elem._to_backed()) + elif sp.issparse(elem): + total_bytes = _cs_bytes(elem) + else: + total_bytes = elem.__sizeof__() + + bytes_per_row = total_bytes / n_obs return max(1, int(target_bytes / bytes_per_row)) if bytes_per_row > 0 else 1 @@ -145,7 +135,7 @@ def callback( ): # Ensure we're not overriding anything here. dataset_kwargs = dataset_kwargs.copy() - elem_shard_size = _resolve_shard_obs(shard_size, elem, iospec) + elem_shard_size = _resolve_shard_obs(shard_size, elem) if iospec.encoding_type in {"array"} and ( any(n in store.name for n in {"obsm", "layers", "obsp"}) or "X" == elem_name ): @@ -165,7 +155,9 @@ def callback( } elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: nnz = elem.nnz - avg_nnz = nnz / elem.shape[0] if elem.shape[0] > 0 else 1.0 + if elem.shape[0] == 0: + raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") + avg_nnz = nnz / elem.shape[0] sparse_chunk = max(1, int(chunk_size * avg_nnz)) sparse_shard = max(1, int(elem_shard_size * avg_nnz)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard From 62869f0413cf67931f6e6ecf0e93027613473d54 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Fri, 6 Mar 2026 16:17:40 +0100 Subject: [PATCH 04/16] Add zarr param changes --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e58f69bb..3ce4ef20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning][]. ## [0.0.9] - {class}`annbatch.DatasetCollection` now accepts a `rng` argument to the {meth}`annbatch.DatasetCollection.add_adatas` method. +- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.io.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``zarr_chunk_size`` and ``zarr_shard_size``. +- `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.io.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer observation count. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. ## [0.0.8] From cfc99d1636e609951d3f4dd59d4959585916e380 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Fri, 6 Mar 2026 16:22:32 +0100 Subject: [PATCH 05/16] fix readthedocs errors --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ce4ef20..0f7634d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,8 @@ and this project adheres to [Semantic Versioning][]. ## [0.0.9] - {class}`annbatch.DatasetCollection` now accepts a `rng` argument to the {meth}`annbatch.DatasetCollection.add_adatas` method. -- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.io.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``zarr_chunk_size`` and ``zarr_shard_size``. -- `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.io.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer observation count. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. +- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``zarr_chunk_size`` and ``zarr_shard_size``. +- `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer observation count. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. ## [0.0.8] From 7b1893ca2b7fd645923bef92f38aa3022d606d8e Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Fri, 6 Mar 2026 16:28:39 +0100 Subject: [PATCH 06/16] Fix errors --- src/annbatch/io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index ff7b00b2..39c7aa99 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -135,10 +135,10 @@ def callback( ): # Ensure we're not overriding anything here. dataset_kwargs = dataset_kwargs.copy() - elem_shard_size = _resolve_shard_obs(shard_size, elem) if iospec.encoding_type in {"array"} and ( any(n in store.name for n in {"obsm", "layers", "obsp"}) or "X" == elem_name ): + elem_shard_size = _resolve_shard_obs(shard_size, elem) # Get either the desired size or the next multiple down to ensure divisibility of chunks and shards dense_chunk = min(chunk_size, _round_down(elem.shape[0], chunk_size)) if elem.shape[0] < dense_chunk or dense_chunk == 0: @@ -154,6 +154,7 @@ def callback( "compressors": compressors, } elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: + elem_shard_size = _resolve_shard_obs(shard_size, elem) nnz = elem.nnz if elem.shape[0] == 0: raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") From fab4eea615878c9e84f5f202c56c5e3990e5cce3 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:13:28 +0100 Subject: [PATCH 07/16] Update src/annbatch/io.py Co-authored-by: Ilan Gold --- src/annbatch/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 39c7aa99..533b36d1 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -61,7 +61,7 @@ def _round_down(num: int, divisor: int): return num - (num % divisor) -def _resolve_shard_obs(shard_size: int | str, elem) -> int: +def _shard_size_param_to_n_obs(shard_size: int | str, elem) -> int: """Convert *shard_size* to an observation count for a single array element. If *shard_size* is already an int it is returned as-is. When it is a From c8088c6259299d7d7ae90fc83a8436b5d9e77c64 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:13:41 +0100 Subject: [PATCH 08/16] Update src/annbatch/io.py Co-authored-by: Ilan Gold --- src/annbatch/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 533b36d1..a557ba58 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -62,7 +62,7 @@ def _round_down(num: int, divisor: int): def _shard_size_param_to_n_obs(shard_size: int | str, elem) -> int: - """Convert *shard_size* to an observation count for a single array element. + """Convert `shard_size` to a number of observations given the size of an element from the anndata object. If *shard_size* is already an int it is returned as-is. When it is a size string the target byte budget is divided by the element's From 5dcfaf369bfd8606d319ea59924b61adb5385fa3 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:14:14 +0100 Subject: [PATCH 09/16] Update src/annbatch/io.py Co-authored-by: Ilan Gold --- src/annbatch/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index a557ba58..acc7995a 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -64,7 +64,7 @@ def _round_down(num: int, divisor: int): def _shard_size_param_to_n_obs(shard_size: int | str, elem) -> int: """Convert `shard_size` to a number of observations given the size of an element from the anndata object. - If *shard_size* is already an int it is returned as-is. When it is a + If *shard_size* is already an int, it is interpreted as `n_obs`. When it is a size string the target byte budget is divided by the element's uncompressed bytes-per-observation-row. """ From 807533935c911da517187e9d690a72048773d037 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Mon, 9 Mar 2026 14:34:58 +0100 Subject: [PATCH 10/16] chore: update variable names + changelog --- CHANGELOG.md | 2 +- src/annbatch/io.py | 18 +++++++++--------- tests/conftest.py | 2 +- tests/test_preshuffle.py | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f7634d1..d9f899b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning][]. - {class}`annbatch.DatasetCollection` now accepts a `rng` argument to the {meth}`annbatch.DatasetCollection.add_adatas` method. - The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``zarr_chunk_size`` and ``zarr_shard_size``. -- `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer observation count. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. +- `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer number of observations. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. ## [0.0.8] diff --git a/src/annbatch/io.py b/src/annbatch/io.py index acc7995a..0170077f 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -96,7 +96,7 @@ def write_sharded( group: zarr.Group, adata: ad.AnnData, *, - chunk_size: int = 64, + obs_per_chunk: int = 64, shard_size: int | str = 2_097_152, compressors: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), key: str | None = None, @@ -138,14 +138,14 @@ def callback( if iospec.encoding_type in {"array"} and ( any(n in store.name for n in {"obsm", "layers", "obsp"}) or "X" == elem_name ): - elem_shard_size = _resolve_shard_obs(shard_size, elem) + elem_shard_size = _shard_size_param_to_n_obs(shard_size, elem) # Get either the desired size or the next multiple down to ensure divisibility of chunks and shards - dense_chunk = min(chunk_size, _round_down(elem.shape[0], chunk_size)) + dense_chunk = min(obs_per_chunk, _round_down(elem.shape[0], obs_per_chunk)) if elem.shape[0] < dense_chunk or dense_chunk == 0: raise ValueError( - f"Choose a shard obs {shard_size} and chunk obs {chunk_size} with non-zero size less than the number of observations {elem.shape[0]}" + f"Choose a shard obs {shard_size} and chunk obs {obs_per_chunk} with non-zero size less than the number of observations {elem.shape[0]}" ) - dense_shard = min(elem_shard_size, _round_down(elem.shape[0], chunk_size)) + dense_shard = min(elem_shard_size, _round_down(elem.shape[0], obs_per_chunk)) dense_shard = max(dense_chunk, _round_down(dense_shard, dense_chunk)) dataset_kwargs = { **dataset_kwargs, @@ -154,12 +154,12 @@ def callback( "compressors": compressors, } elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: - elem_shard_size = _resolve_shard_obs(shard_size, elem) + elem_shard_size = _shard_size_param_to_n_obs(shard_size, elem) nnz = elem.nnz if elem.shape[0] == 0: raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") avg_nnz = nnz / elem.shape[0] - sparse_chunk = max(1, int(chunk_size * avg_nnz)) + sparse_chunk = max(1, int(obs_per_chunk * avg_nnz)) sparse_shard = max(1, int(elem_shard_size * avg_nnz)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard sparse_chunk = min(sparse_chunk, sparse_shard) @@ -641,7 +641,7 @@ def _create_collection( write_sharded( self._group, adata_chunk, - chunk_size=zarr_chunk_size, + obs_per_chunk=zarr_chunk_size, shard_size=zarr_shard_size, compressors=zarr_compressor, key=f"{DATASET_PREFIX}_{i}", @@ -738,7 +738,7 @@ def _add_to_collection( write_sharded( self._group, adata, - chunk_size=zarr_chunk_size, + obs_per_chunk=zarr_chunk_size, shard_size=zarr_shard_size, compressors=zarr_compressor, key=dataset, diff --git a/tests/conftest.py b/tests/conftest.py index 2da2c0df..464b476b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,7 +51,7 @@ def adata_with_zarr_path_same_var_space(tmpdir_factory, n_shards: int = 3) -> Ge write_sharded( f, adata, - chunk_size=10, + obs_per_chunk=10, shard_size=20, ) yield ( diff --git a/tests/test_preshuffle.py b/tests/test_preshuffle.py index d52a0da5..0ac3d648 100644 --- a/tests/test_preshuffle.py +++ b/tests/test_preshuffle.py @@ -25,7 +25,7 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") with pytest.raises(ValueError, match=r"Choose a shard obs"): - write_sharded(z, adata, chunk_size=20) + write_sharded(z, adata, obs_per_chunk=20) @pytest.mark.parametrize( @@ -35,7 +35,7 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): def test_write_sharded_shard_size_too_big(tmp_path: Path, chunk_size: int, expected_shard_size: int): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") - write_sharded(z, adata, chunk_size=chunk_size, shard_size=20) + write_sharded(z, adata, obs_per_chunk=chunk_size, shard_size=20) assert z["X"].shards == (expected_shard_size, 20) # i.e., the closest multiple to `dense_chunk_size` From be69b39aed78fe3a00a40615839ec7a24a504555 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Wed, 11 Mar 2026 10:53:56 +0100 Subject: [PATCH 11/16] Update src/annbatch/io.py Co-authored-by: Ilan Gold --- src/annbatch/io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 0170077f..e69351cd 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -162,8 +162,9 @@ def callback( sparse_chunk = max(1, int(obs_per_chunk * avg_nnz)) sparse_shard = max(1, int(elem_shard_size * avg_nnz)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard - sparse_chunk = min(sparse_chunk, sparse_shard) sparse_shard = _round_down(sparse_shard, sparse_chunk) + if sparse_shard < sparse_chunk: + raise RuntimeError(f"Calculated invalid sparse chunk size {sparse_chunk} for shard size {sparse_shard}") dataset_kwargs = { **dataset_kwargs, "shards": (sparse_shard,), From c7940676d4d1d8d8e05deb504fd3fcf9820d74fd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 09:56:52 +0000 Subject: [PATCH 12/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/annbatch/io.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index e69351cd..92ba073f 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -164,7 +164,9 @@ def callback( sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard sparse_shard = _round_down(sparse_shard, sparse_chunk) if sparse_shard < sparse_chunk: - raise RuntimeError(f"Calculated invalid sparse chunk size {sparse_chunk} for shard size {sparse_shard}") + raise RuntimeError( + f"Calculated invalid sparse chunk size {sparse_chunk} for shard size {sparse_shard}" + ) dataset_kwargs = { **dataset_kwargs, "shards": (sparse_shard,), From 124f2b833a48df8a4ccd60c5ee4ef73246a67ee6 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Wed, 11 Mar 2026 13:41:30 +0100 Subject: [PATCH 13/16] Update method params --- CHANGELOG.md | 2 +- src/annbatch/io.py | 30 +++++++++++++++--------------- tests/conftest.py | 2 +- tests/test_preshuffle.py | 24 ++++++++++++------------ 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9f899b8..098f8540 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning][]. ## [0.0.9] - {class}`annbatch.DatasetCollection` now accepts a `rng` argument to the {meth}`annbatch.DatasetCollection.add_adatas` method. -- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``zarr_chunk_size`` and ``zarr_shard_size``. +- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``n_obs_per_chunk`` and ``zarr_shard_size``. - `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer number of observations. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size. ## [0.0.8] diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 0170077f..353a5864 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -138,14 +138,14 @@ def callback( if iospec.encoding_type in {"array"} and ( any(n in store.name for n in {"obsm", "layers", "obsp"}) or "X" == elem_name ): - elem_shard_size = _shard_size_param_to_n_obs(shard_size, elem) + obs_per_shard = _shard_size_param_to_n_obs(shard_size, elem) # Get either the desired size or the next multiple down to ensure divisibility of chunks and shards dense_chunk = min(obs_per_chunk, _round_down(elem.shape[0], obs_per_chunk)) if elem.shape[0] < dense_chunk or dense_chunk == 0: raise ValueError( f"Choose a shard obs {shard_size} and chunk obs {obs_per_chunk} with non-zero size less than the number of observations {elem.shape[0]}" ) - dense_shard = min(elem_shard_size, _round_down(elem.shape[0], obs_per_chunk)) + dense_shard = min(obs_per_shard, _round_down(elem.shape[0], obs_per_chunk)) dense_shard = max(dense_chunk, _round_down(dense_shard, dense_chunk)) dataset_kwargs = { **dataset_kwargs, @@ -154,13 +154,13 @@ def callback( "compressors": compressors, } elif iospec.encoding_type in {"csr_matrix", "csc_matrix"}: - elem_shard_size = _shard_size_param_to_n_obs(shard_size, elem) + obs_per_shard = _shard_size_param_to_n_obs(shard_size, elem) nnz = elem.nnz if elem.shape[0] == 0: raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") - avg_nnz = nnz / elem.shape[0] - sparse_chunk = max(1, int(obs_per_chunk * avg_nnz)) - sparse_shard = max(1, int(elem_shard_size * avg_nnz)) + avg_nnz_per_obs = nnz / elem.shape[0] + sparse_chunk = max(1, int(obs_per_chunk * avg_nnz_per_obs)) + sparse_shard = max(1, int(obs_per_shard * avg_nnz_per_obs)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard sparse_chunk = min(sparse_chunk, sparse_shard) sparse_shard = _round_down(sparse_shard, sparse_chunk) @@ -441,7 +441,7 @@ def add_adatas( *, load_adata: Callable[[zarr.Group | h5py.Group | PathLike[str] | str], ad.AnnData] = _default_load_adata, var_subset: Iterable[str] | None = None, - zarr_chunk_size: int = 64, + n_obs_per_chunk: int = 64, zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", @@ -473,7 +473,7 @@ def add_adatas( var_subset Subset of gene names to include in the store. If None, all genes are included. Genes are subset based on the `var_names` attribute of the concatenated AnnData object. - zarr_chunk_size + n_obs_per_chunk Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. zarr_shard_size @@ -530,7 +530,7 @@ def add_adatas( shared_kwargs = { "adata_paths": adata_paths, "load_adata": load_adata, - "zarr_chunk_size": zarr_chunk_size, + "n_obs_per_chunk": n_obs_per_chunk, "zarr_shard_size": zarr_shard_size, "zarr_compressor": zarr_compressor, "h5ad_compressor": h5ad_compressor, @@ -550,7 +550,7 @@ def _create_collection( adata_paths: Iterable[PathLike[str]] | Iterable[str], load_adata: Callable[[PathLike[str] | str], ad.AnnData] = _default_load_adata, var_subset: Iterable[str] | None = None, - zarr_chunk_size: int = 64, + n_obs_per_chunk: int = 64, zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", @@ -581,7 +581,7 @@ def _create_collection( Subset of gene names to include in the store. If None, all genes are included. Genes are subset based on the `var_names` attribute of the concatenated AnnData object. Only applicable when adding datasets for the first time, otherwise ignored and the incoming data's var space is subsetted to that of the existing collection. - zarr_chunk_size + n_obs_per_chunk Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. zarr_shard_size @@ -641,7 +641,7 @@ def _create_collection( write_sharded( self._group, adata_chunk, - obs_per_chunk=zarr_chunk_size, + obs_per_chunk=n_obs_per_chunk, shard_size=zarr_shard_size, compressors=zarr_compressor, key=f"{DATASET_PREFIX}_{i}", @@ -660,7 +660,7 @@ def _add_to_collection( *, adata_paths: Iterable[PathLike[str]] | Iterable[str], load_adata: Callable[[PathLike[str] | str], ad.AnnData] = ad.read_h5ad, - zarr_chunk_size: int = 64, + n_obs_per_chunk: int = 64, zarr_shard_size: int | str = "1GB", zarr_compressor: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), h5ad_compressor: Literal["gzip", "lzf"] | None = "gzip", @@ -683,7 +683,7 @@ def _add_to_collection( If you only need a subset of the input anndata files' elems (e.g., only `X` and `obs`), you can provide a custom function here to speed up loading and harmonize your data. The input to the function is a path to an anndata file, and the output is an anndata object. If the input data is too large to fit into memory, you should use :func:`annndata.experimental.read_lazy` instead. - zarr_chunk_size + n_obs_per_chunk Number of observations per zarr chunk. For dense arrays this is used directly as the first-axis chunk size. For sparse arrays it is converted to element counts using the average number of non-zero elements per row of the matrix being written. zarr_shard_size @@ -738,7 +738,7 @@ def _add_to_collection( write_sharded( self._group, adata, - obs_per_chunk=zarr_chunk_size, + obs_per_chunk=n_obs_per_chunk, shard_size=zarr_shard_size, compressors=zarr_compressor, key=dataset, diff --git a/tests/conftest.py b/tests/conftest.py index 464b476b..9b4835cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,7 +114,7 @@ def simple_collection( output_path = Path(tmpdir_factory.mktemp("zarr_folder")) / "simple_fixture.zarr" collection = DatasetCollection(output_path).add_adatas( zarr_stores, - zarr_chunk_size=10, + n_obs_per_chunk=10, zarr_shard_size=20, n_obs_per_dataset=60, shuffle_chunk_size=10, diff --git a/tests/test_preshuffle.py b/tests/test_preshuffle.py index 0ac3d648..509b93f5 100644 --- a/tests/test_preshuffle.py +++ b/tests/test_preshuffle.py @@ -53,7 +53,7 @@ def test_store_creation_warnings_with_different_keys(elem_name: Literal["obsm", with pytest.warns(UserWarning, match=rf"Found {elem_name} keys.* not present in all anndatas"): DatasetCollection(tmp_path / "collection.zarr").add_adatas( [path_1, path_2], - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=10, @@ -69,7 +69,7 @@ def test_store_creation_no_warnings_with_custom_load(tmp_path: Path): adata_2.write_h5ad(path_2) collection = DatasetCollection(tmp_path / "collection.zarr").add_adatas( [path_1, path_2], - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=5, @@ -89,7 +89,7 @@ def test_store_creation_path_added_to_obs(tmp_path: Path): output_dir = tmp_path / "path_src_collection.zarr" collection = DatasetCollection(output_dir).add_adatas( paths, - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, n_obs_per_dataset=10, shuffle_chunk_size=5, @@ -116,7 +116,7 @@ def test_store_addition_different_keys( collection = DatasetCollection(output_path) collection.add_adatas( [orig_path], - zarr_chunk_size=10, + n_obs_per_chunk=10, zarr_shard_size=20, n_obs_per_dataset=50, shuffle_chunk_size=10, @@ -131,7 +131,7 @@ def test_store_addition_different_keys( collection.add_adatas( [additional_path], load_adata=load_adata, - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, shuffle_chunk_size=2, ) @@ -194,7 +194,7 @@ def test_store_creation( collection = DatasetCollection(output_path).add_adatas( [adata_with_h5_path_different_var_space[1] / f for f in h5_files if str(f).endswith(".h5ad")], var_subset=var_subset, - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, n_obs_per_dataset=50, shuffle_chunk_size=10, @@ -246,7 +246,7 @@ def test_store_creation( pd.testing.assert_frame_equal(adata.obs, adata_orig.obs) z = zarr.open(output_path / "dataset_0") - # assert chunk behavior (unified zarr_chunk_size=5 for both sparse and dense) + # assert chunk behavior (unified n_obs_per_chunk=5 for both sparse and dense) assert z["obsm"]["arr"].chunks[0] == 5, z["obsm"]["arr"] # sparse indices use obs-based chunk; exact element count depends on per-dataset avg_nnz # ensure proper downcasting @@ -282,7 +282,7 @@ def test_mismatched_raw_concat( h5_paths = [adata_with_h5_path_different_var_space[1] / f for f in h5_files if str(f).endswith(".h5ad")] collection = DatasetCollection(output_path).add_adatas( h5_paths, - zarr_chunk_size=10, + n_obs_per_chunk=10, zarr_shard_size=20, n_obs_per_dataset=30, shuffle_chunk_size=10, @@ -325,7 +325,7 @@ def test_store_extension( collection = DatasetCollection(store_path) collection.add_adatas( original, - zarr_chunk_size=10, + n_obs_per_chunk=10, zarr_shard_size=20, n_obs_per_dataset=60, shuffle_chunk_size=10, @@ -335,7 +335,7 @@ def test_store_extension( collection.add_adatas( additional, load_adata=load_adata, - zarr_chunk_size=5, + n_obs_per_chunk=5, zarr_shard_size=10, n_obs_per_dataset=50, shuffle_chunk_size=10, @@ -371,7 +371,7 @@ def test_collection_rng_reproducibility(adata_with_zarr_path_same_var_space: tup zarr_stores = sorted(adata_with_zarr_path_same_var_space[1].glob("*.zarr")) seed = 42 kwargs = { - "zarr_chunk_size": 10, + "n_obs_per_chunk": 10, "zarr_shard_size": 20, "n_obs_per_dataset": 200, "shuffle_chunk_size": 10, @@ -400,7 +400,7 @@ def test_string_size_params_end_to_end(tmp_path: Path): output = tmp_path / "collection.zarr" collection = DatasetCollection(output).add_adatas( [path], - zarr_chunk_size=10, + n_obs_per_chunk=10, zarr_shard_size=target_shard_size, zarr_compressor=(), shuffle_chunk_size=10, From 147e4a31bff46257e1e6aa732a32244d27b0e2eb Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Wed, 11 Mar 2026 14:11:52 +0100 Subject: [PATCH 14/16] Fix tests --- src/annbatch/io.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index e9393a94..4e22fe83 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -160,13 +160,10 @@ def callback( raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") avg_nnz_per_obs = nnz / elem.shape[0] sparse_chunk = max(1, int(obs_per_chunk * avg_nnz_per_obs)) + sparse_chunk = min(sparse_chunk, nnz) if nnz > 0 else sparse_chunk sparse_shard = max(1, int(obs_per_shard * avg_nnz_per_obs)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard - sparse_shard = _round_down(sparse_shard, sparse_chunk) - if sparse_shard < sparse_chunk: - raise RuntimeError( - f"Calculated invalid sparse chunk size {sparse_chunk} for shard size {sparse_shard}" - ) + sparse_shard = max(sparse_chunk, _round_down(sparse_shard, sparse_chunk)) dataset_kwargs = { **dataset_kwargs, "shards": (sparse_shard,), From 275aff7811145522ba887794e411fc7581e22f60 Mon Sep 17 00:00:00 2001 From: Felix Fischer Date: Wed, 11 Mar 2026 14:25:57 +0100 Subject: [PATCH 15/16] Rename method params --- src/annbatch/io.py | 16 ++++++++-------- tests/conftest.py | 2 +- tests/test_preshuffle.py | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/annbatch/io.py b/src/annbatch/io.py index 4e22fe83..fd0abf78 100644 --- a/src/annbatch/io.py +++ b/src/annbatch/io.py @@ -96,7 +96,7 @@ def write_sharded( group: zarr.Group, adata: ad.AnnData, *, - obs_per_chunk: int = 64, + n_obs_per_chunk: int = 64, shard_size: int | str = 2_097_152, compressors: Iterable[BytesBytesCodec] = (BloscCodec(cname="lz4", clevel=3, shuffle=BloscShuffle.shuffle),), key: str | None = None, @@ -109,7 +109,7 @@ def write_sharded( The destination group, must be zarr v3 adata The source anndata object - obs_per_chunk + n_obs_per_chunk Number of observations per chunk. For dense arrays this directly sets the first-axis chunk size. For sparse arrays it is converted to element counts using the average non-zero elements per row of the matrix being written. shard_size @@ -140,12 +140,12 @@ def callback( ): obs_per_shard = _shard_size_param_to_n_obs(shard_size, elem) # Get either the desired size or the next multiple down to ensure divisibility of chunks and shards - dense_chunk = min(obs_per_chunk, _round_down(elem.shape[0], obs_per_chunk)) + dense_chunk = min(n_obs_per_chunk, _round_down(elem.shape[0], n_obs_per_chunk)) if elem.shape[0] < dense_chunk or dense_chunk == 0: raise ValueError( - f"Choose a shard obs {shard_size} and chunk obs {obs_per_chunk} with non-zero size less than the number of observations {elem.shape[0]}" + f"Choose a shard obs {shard_size} and chunk obs {n_obs_per_chunk} with non-zero size less than the number of observations {elem.shape[0]}" ) - dense_shard = min(obs_per_shard, _round_down(elem.shape[0], obs_per_chunk)) + dense_shard = min(obs_per_shard, _round_down(elem.shape[0], n_obs_per_chunk)) dense_shard = max(dense_chunk, _round_down(dense_shard, dense_chunk)) dataset_kwargs = { **dataset_kwargs, @@ -159,7 +159,7 @@ def callback( if elem.shape[0] == 0: raise ValueError(f"Cannot write sharded sparse matrix {elem_name!r} with 0 observations.") avg_nnz_per_obs = nnz / elem.shape[0] - sparse_chunk = max(1, int(obs_per_chunk * avg_nnz_per_obs)) + sparse_chunk = max(1, int(n_obs_per_chunk * avg_nnz_per_obs)) sparse_chunk = min(sparse_chunk, nnz) if nnz > 0 else sparse_chunk sparse_shard = max(1, int(obs_per_shard * avg_nnz_per_obs)) sparse_shard = min(sparse_shard, nnz) if nnz > 0 else sparse_shard @@ -642,7 +642,7 @@ def _create_collection( write_sharded( self._group, adata_chunk, - obs_per_chunk=n_obs_per_chunk, + n_obs_per_chunk=n_obs_per_chunk, shard_size=zarr_shard_size, compressors=zarr_compressor, key=f"{DATASET_PREFIX}_{i}", @@ -739,7 +739,7 @@ def _add_to_collection( write_sharded( self._group, adata, - obs_per_chunk=n_obs_per_chunk, + n_obs_per_chunk=n_obs_per_chunk, shard_size=zarr_shard_size, compressors=zarr_compressor, key=dataset, diff --git a/tests/conftest.py b/tests/conftest.py index 9b4835cb..7a73808f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,7 +51,7 @@ def adata_with_zarr_path_same_var_space(tmpdir_factory, n_shards: int = 3) -> Ge write_sharded( f, adata, - obs_per_chunk=10, + n_obs_per_chunk=10, shard_size=20, ) yield ( diff --git a/tests/test_preshuffle.py b/tests/test_preshuffle.py index 5393cfc4..61aa2596 100644 --- a/tests/test_preshuffle.py +++ b/tests/test_preshuffle.py @@ -25,7 +25,7 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") with pytest.raises(ValueError, match=r"Choose a shard obs"): - write_sharded(z, adata, obs_per_chunk=20) + write_sharded(z, adata, n_obs_per_chunk=20) @pytest.mark.parametrize( @@ -35,7 +35,7 @@ def test_write_sharded_bad_chunk_size(tmp_path: Path): def test_write_sharded_shard_size_too_big(tmp_path: Path, chunk_size: int, expected_shard_size: int): adata = ad.AnnData(np.random.randn(10, 20)) z = zarr.open(tmp_path / "foo.zarr") - write_sharded(z, adata, obs_per_chunk=chunk_size, shard_size=20) + write_sharded(z, adata, n_obs_per_chunk=chunk_size, shard_size=20) assert z["X"].shards == (expected_shard_size, 20) # i.e., the closest multiple to `dense_chunk_size` From 33c797e8901f3c343c269b407b46adc1827432a4 Mon Sep 17 00:00:00 2001 From: Felix Fischer <47145207+felix0097@users.noreply.github.com> Date: Wed, 11 Mar 2026 17:19:45 +0100 Subject: [PATCH 16/16] Update CHANGELOG.md Co-authored-by: Ilan Gold --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f9d6b94..17c080dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning][]. ### Fixed - Formatted progress bar descriptions to be more readable. - {class}`annbatch.DatasetCollection` now accepts a `rng` argument to the {meth}`annbatch.DatasetCollection.add_adatas` method. -- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``chunk_size`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``n_obs_per_chunk`` and ``zarr_shard_size``. +- The ``sparse_chunk_size``, ``sparse_shard_size``, ``dense_chunk_size``, and ``dense_shard_size`` parameters of {func}`annbatch.write_sharded` have been replaced by ``n_obs_per_chunk`` (number of observations per chunk, automatically converted to element counts for sparse arrays) and ``shard_size`` (number of observations per shard or a size string). The corresponding parameters in {meth}`annbatch.DatasetCollection.add_adatas` are ``n_obs_per_chunk`` and ``zarr_shard_size``. - `zarr_shard_size` in {meth}`annbatch.DatasetCollection.add_adatas` and `shard_size` in {func}`annbatch.write_sharded` now accept a human-readable size string (e.g. ``'1GB'``, ``'512MB'``) in addition to an integer number of observations. When a string is provided, the observation count is derived independently for each array element from its uncompressed bytes-per-row so that every shard stays close to the target size.