Skip to content

Commit

Permalink
Merge pull request #38 from shoyer/better-data-model
Browse files Browse the repository at this point in the history
Refactor the data model for excutors
  • Loading branch information
TomAugspurger committed Aug 4, 2020
2 parents 22e6ca0 + 278e270 commit 685a8b2
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 87 deletions.
27 changes: 12 additions & 15 deletions rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import dask

from rechunker.algorithm import rechunking_plan
from rechunker.types import CopySpec, StagedCopySpec, Executor
from rechunker.types import ArrayProxy, CopySpec, Executor


class Rechunked:
Expand Down Expand Up @@ -218,10 +218,10 @@ def rechunk(
"""
if isinstance(executor, str):
executor = _get_executor(executor)
copy_specs, intermediate, target = _setup_rechunk(
copy_spec, intermediate, target = _setup_rechunk(
source, target_chunks, max_mem, target_store, temp_store
)
plan = executor.prepare_plan(copy_specs)
plan = executor.prepare_plan(copy_spec)
return Rechunked(executor, plan, source, intermediate, target)


Expand Down Expand Up @@ -264,10 +264,8 @@ def _setup_rechunk(
target_store,
temp_store_or_group=temp_store,
)
intermediate = (
copy_spec.stages[0].target if len(copy_spec.stages) == 2 else None
)
target = copy_spec.stages[-1].target
intermediate = copy_spec.intermediate.array
target = copy_spec.write.array
return [copy_spec], intermediate, target

else:
Expand All @@ -281,7 +279,7 @@ def _setup_array_rechunk(
target_store_or_group,
temp_store_or_group=None,
name=None,
) -> StagedCopySpec:
) -> CopySpec:
shape = source_array.shape
source_chunks = (
source_array.chunksize
Expand Down Expand Up @@ -334,16 +332,15 @@ def _setup_array_rechunk(
pass

if read_chunks == write_chunks:
return StagedCopySpec([CopySpec(source_array, target_array, read_chunks)])
int_array = None
else:
# do intermediate store
assert temp_store_or_group is not None
int_array = _zarr_empty(
shape, temp_store_or_group, int_chunks, dtype, name=name
)
return StagedCopySpec(
[
CopySpec(source_array, int_array, read_chunks),
CopySpec(int_array, target_array, write_chunks),
]
)

read_proxy = ArrayProxy(source_array, read_chunks)
int_proxy = ArrayProxy(int_array, int_chunks)
write_proxy = ArrayProxy(target_array, write_chunks)
return CopySpec(read_proxy, int_proxy, write_proxy)
37 changes: 17 additions & 20 deletions rechunker/executors/beam.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import uuid
from typing import Iterable, Optional, Mapping, Tuple
from typing import Iterable, Mapping, Tuple

import apache_beam as beam

from rechunker.executors.util import chunk_keys
from rechunker.types import (
CopySpec,
StagedCopySpec,
Executor,
ReadableArray,
WriteableArray,
from rechunker.executors.util import (
DirectCopySpec,
chunk_keys,
split_into_direct_copies,
)
from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray


class BeamExecutor(Executor[beam.PTransform]):
Expand All @@ -27,7 +25,7 @@ class BeamExecutor(Executor[beam.PTransform]):
# operations instead of explicitly writing intermediate arrays to disk.
# This would offer a cleaner API and would perhaps be faster, too.

def prepare_plan(self, specs: Iterable[StagedCopySpec]) -> beam.PTransform:
def prepare_plan(self, specs: Iterable[CopySpec]) -> beam.PTransform:
return "Rechunker" >> _Rechunker(specs)

def execute_plan(self, plan: beam.PTransform, **kwargs):
Expand All @@ -36,13 +34,13 @@ def execute_plan(self, plan: beam.PTransform, **kwargs):


class _Rechunker(beam.PTransform):
def __init__(self, specs: Iterable[StagedCopySpec]):
def __init__(self, specs: Iterable[CopySpec]):
super().__init__()
self.specs = tuple(specs)
self.direct_specs = tuple(map(split_into_direct_copies, specs))

def expand(self, pcoll):
max_depth = max(len(spec.stages) for spec in self.specs)
specs_map = {uuid.uuid1().hex: spec for spec in self.specs}
max_depth = max(len(copies) for copies in self.direct_specs)
specs_map = {uuid.uuid1().hex: copies for copies in self.direct_specs}

# we explicitly thread target_id through each stage to ensure that they
# are executed in order
Expand All @@ -52,15 +50,14 @@ def expand(self, pcoll):
pcoll = pcoll | "Create" >> beam.Create(specs_map.keys())
for stage in range(max_depth):
specs_by_target = {
k: v.stages[stage] if stage < len(v.stages) else None
for k, v in specs_map.items()
k: v[stage] if stage < len(v) else None for k, v in specs_map.items()
}
pcoll = pcoll | f"Stage{stage}" >> _CopyStage(specs_by_target)
return pcoll


class _CopyStage(beam.PTransform):
def __init__(self, specs_by_target: Mapping[str, CopySpec]):
def __init__(self, specs_by_target: Mapping[str, DirectCopySpec]):
super().__init__()
self.specs_by_target = specs_by_target

Expand All @@ -79,15 +76,15 @@ def expand(self, pcoll):


def _start_stage(
target_id: str, specs_by_target: Mapping[str, Optional[CopySpec]],
) -> Tuple[str, CopySpec]:
spec = specs_by_target[target_id]
target_id: str, specs_by_target: Mapping[str, DirectCopySpec],
) -> Tuple[str, DirectCopySpec]:
spec = specs_by_target.get(target_id)
if spec is not None:
yield target_id, spec


def _copy_tasks(
target_id: str, spec: CopySpec
target_id: str, spec: DirectCopySpec
) -> Tuple[str, Tuple[slice, ...], ReadableArray, WriteableArray]:
for key in chunk_keys(spec.source.shape, spec.chunks):
yield target_id, key, spec.source, spec.target
Expand Down
52 changes: 27 additions & 25 deletions rechunker/executors/dask.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import uuid
from typing import Iterable
from typing import Iterable, Tuple

import dask
import dask.array
from dask.delayed import Delayed
from dask.optimization import fuse

from rechunker.types import CopySpec, StagedCopySpec, Executor
from rechunker.types import Array, CopySpec, Executor


class DaskExecutor(Executor[Delayed]):
Expand All @@ -17,44 +17,48 @@ class DaskExecutor(Executor[Delayed]):
Execution plans for DaskExecutors are dask.delayed objects.
"""

def prepare_plan(self, specs: Iterable[StagedCopySpec]) -> Delayed:
return _staged_copy(specs)
def prepare_plan(self, specs: Iterable[CopySpec]) -> Delayed:
return _copy_all(specs)

def execute_plan(self, plan: Delayed, **kwargs):
return plan.compute(**kwargs)


def _direct_copy_array(copy_spec: CopySpec) -> Delayed:
"""Direct copy between zarr arrays."""
source_array, target_array, chunks = copy_spec
if isinstance(source_array, dask.array.Array):
source_read = source_array
def _direct_array_copy(
source: Array, target: Array, chunks: Tuple[int, ...]
) -> Delayed:
"""Direct copy between arrays."""
if isinstance(source, dask.array.Array):
source_read = source
else:
source_read = dask.array.from_zarr(source_array, chunks=chunks)
source_read = dask.array.from_zarr(source, chunks=chunks)
target_store_delayed = dask.array.store(
source_read, target_array, lock=False, compute=False
source_read, target, lock=False, compute=False
)
return target_store_delayed


def _staged_array_copy(staged_copy_spec: StagedCopySpec) -> Delayed:
"""Staged copy between zarr arrays."""
if len(staged_copy_spec.stages) == 1:
(copy_spec,) = staged_copy_spec.stages
target_store_delayed = _direct_copy_array(copy_spec)
def _chunked_array_copy(spec: CopySpec) -> Delayed:
"""Chunked copy between arrays."""
if spec.intermediate.array is None:
target_store_delayed = _direct_array_copy(
spec.read.array, spec.write.array, spec.read.chunks,
)

# fuse
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
dsk_fused, _ = fuse(target_dsk)

return Delayed(target_store_delayed.key, dsk_fused)

elif len(staged_copy_spec.stages) == 2:
first_copy, second_copy = staged_copy_spec.stages

else:
# do intermediate store
int_store_delayed = _direct_copy_array(first_copy)
target_store_delayed = _direct_copy_array(second_copy)
int_store_delayed = _direct_array_copy(
spec.read.array, spec.intermediate.array, spec.read.chunks,
)
target_store_delayed = _direct_array_copy(
spec.intermediate.array, spec.write.array, spec.write.chunks,
)

# now do some hacking to chain these together into a single graph.
# get the two graphs as dicts
Expand All @@ -81,17 +85,15 @@ def _staged_array_copy(staged_copy_spec: StagedCopySpec) -> Delayed:
# fuse
dsk_fused, _ = fuse(target_dsk)
return Delayed(target_store_delayed.key, dsk_fused)
else:
raise NotImplementedError


def _barrier(*args):
return None


def _staged_copy(specs: Iterable[StagedCopySpec],) -> Delayed:
def _copy_all(specs: Iterable[CopySpec],) -> Delayed:

stores_delayed = [_staged_array_copy(spec) for spec in specs]
stores_delayed = [_chunked_array_copy(spec) for spec in specs]

if len(stores_delayed) == 1:
return stores_delayed[0]
Expand Down
16 changes: 8 additions & 8 deletions rechunker/executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from typing import Callable, Iterable, Tuple

from rechunker.executors.util import chunk_keys
from rechunker.types import StagedCopySpec, Executor, ReadableArray, WriteableArray
from rechunker.executors.util import chunk_keys, split_into_direct_copies
from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray


# PythonExecutor represents delayed execution tasks as functions that require
Expand All @@ -20,21 +20,21 @@ class PythonExecutor(Executor[Task]):
Execution plans for PythonExecutor are functions that accept no arguments.
"""

def prepare_plan(self, specs: Iterable[StagedCopySpec]) -> Task:
def prepare_plan(self, specs: Iterable[CopySpec]) -> Task:
tasks = []
for staged_copy_spec in specs:
for copy_spec in staged_copy_spec.stages:
tasks.append(partial(_direct_copy_array, *copy_spec))
for spec in specs:
for direct_spec in split_into_direct_copies(spec):
tasks.append(partial(_direct_array_copy, *direct_spec))
return partial(_execute_all, tasks)

def execute_plan(self, plan: Task):
plan()


def _direct_copy_array(
def _direct_array_copy(
source: ReadableArray, target: WriteableArray, chunks: Tuple[int, ...]
) -> None:
"""Direct copy between zarr arrays."""
"""Direct copy between arrays."""
for key in chunk_keys(source.shape, chunks):
target[key] = source[key]

Expand Down
25 changes: 24 additions & 1 deletion rechunker/executors/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import itertools
import math

from typing import Iterator, Tuple
from typing import Iterator, NamedTuple, Tuple

from rechunker.types import CopySpec, ReadableArray, WriteableArray


def chunk_keys(
Expand All @@ -19,3 +21,24 @@ def chunk_keys(
yield tuple(
slice(c * i, min(c * (i + 1), s)) for i, s, c in zip(indices, shape, chunks)
)


class DirectCopySpec(NamedTuple):
"""Specification of how to directly copy between two arrays."""

source: ReadableArray
target: WriteableArray
chunks: Tuple[int, ...]


def split_into_direct_copies(spec: CopySpec) -> Tuple[DirectCopySpec]:
"""Convert a rechunked copy into a list of direct copies."""
if spec.intermediate.array is None:
return (DirectCopySpec(spec.read.array, spec.write.array, spec.read.chunks,),)
else:
return (
DirectCopySpec(spec.read.array, spec.intermediate.array, spec.read.chunks,),
DirectCopySpec(
spec.intermediate.array, spec.write.array, spec.write.chunks,
),
)

0 comments on commit 685a8b2

Please sign in to comment.