Skip to content

Commit

Permalink
Split the concat_zarrs step to avoid very large dask task counts
Browse files Browse the repository at this point in the history
  • Loading branch information
benjeffery authored and mergify[bot] committed Mar 2, 2023
1 parent 2db3a38 commit 3b5251d
Showing 1 changed file with 2 additions and 7 deletions.
9 changes: 2 additions & 7 deletions sgkit/io/vcfzarr_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def concat_zarrs_optimized(
# NOTE: that this uses _to_zarr function defined here that is needed to avoid
# race conditions between writing the array contents and its metadata
# see https://github.com/pystatgen/sgkit/pull/486
delayed = [] # do all the rechunking operations in one computation
for var in vars_to_rechunk:
dtype = None
if fix_strings and var in {"variant_id", "variant_allele"}:
Expand Down Expand Up @@ -374,8 +373,7 @@ def concat_zarrs_optimized(
attrs=first_zarr_group[var].attrs.asdict(),
**_to_zarr_kwargs,
)
d = _fuse_delayed(d) # type: ignore[no-untyped-call]
delayed.append(d)
da.compute(_fuse_delayed(d)) # type: ignore[no-untyped-call]

# copy variables that are not rechunked (e.g. sample_id)
for var in vars_to_copy:
Expand Down Expand Up @@ -404,10 +402,7 @@ def concat_zarrs_optimized(
attrs=first_zarr_group[var].attrs.asdict(),
**_to_zarr_kwargs,
)
d = _fuse_delayed(d) # type: ignore[no-untyped-call]
delayed.append(d)

da.compute(*delayed)
da.compute(_fuse_delayed(d)) # type: ignore[no-untyped-call]

# copy unchanged variables and top-level metadata
with zarr.open_group(output) as output_zarr:
Expand Down

0 comments on commit 3b5251d

Please sign in to comment.