Skip to content

Commit

Permalink
Merge pull request #22 from tomwhite/passthrough-fuse
Browse files Browse the repository at this point in the history
Fuse graph in case where read and write chunks are the same.
  • Loading branch information
rabernat committed Jul 17, 2020
2 parents 4bdb8ae + e25dfb5 commit fdddf0f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
7 changes: 6 additions & 1 deletion rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,14 @@ def _rechunk_array(
target_store_delayed = dsa.store(
source_read, target_array, lock=False, compute=False
)

# fuse
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
dsk_fused, deps = fuse(target_dsk)

return Rechunked(
target_store_delayed.key,
target_store_delayed.dask,
dsk_fused,
source=source_array,
intermediate=None,
target=target_array,
Expand Down
21 changes: 21 additions & 0 deletions tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import zarr
import dask.array as dsa
import dask
import dask.core

from rechunker import api

Expand Down Expand Up @@ -169,3 +170,23 @@ def test_no_intermediate():
rechunked = api.Rechunked("a-b", {}, source=a, intermediate=None, target=b)
assert "Intermediate" not in repr(rechunked)
rechunked._repr_html_()


def test_no_intermediate_fused(tmp_path):
shape = (8000, 8000)
source_chunks = (200, 8000)
dtype = "f4"
max_mem = 25600000
target_chunks = (400, 8000)

store_source = str(tmp_path / "source.zarr")
source_array = zarr.ones(
shape, chunks=source_chunks, dtype=dtype, store=store_source
)

target_store = str(tmp_path / "target.zarr")

rechunked = api.rechunk(source_array, target_chunks, max_mem, target_store)

num_tasks = len([v for v in rechunked.dask.values() if dask.core.istask(v)])
assert num_tasks < 20 # less than if no fuse

0 comments on commit fdddf0f

Please sign in to comment.