From 7ae74355077ba62461d93197c09803dd81f224ba Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 22 May 2021 16:11:25 -0700 Subject: [PATCH 01/12] Add calculate_stage_chunks --- rechunker/__init__.py | 1 + rechunker/algorithm.py | 69 ++++++++++++++++++++++++++++++++++------- tests/test_algorithm.py | 23 +++++++++++++- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/rechunker/__init__.py b/rechunker/__init__.py index 44018dc..099e343 100644 --- a/rechunker/__init__.py +++ b/rechunker/__init__.py @@ -4,4 +4,5 @@ except ImportError: __version__ = "unknown" +from .algorithm import rechunking_plan from .api import Rechunked, rechunk diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index d4abf55..5149b34 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -75,6 +75,59 @@ def consolidate_chunks( return tuple(new_chunks) +def _calculate_shared_chunks( + read_chunks: Sequence[int], write_chunks: Sequence[int] +) -> Tuple[int, ...]: + # Intermediate chunks are the smallest possible chunks which fit + # into both read_chunks and write_chunks. + # Example: + # read_chunks: (20, 5) + # target_chunks: (4, 25) + # intermediate_chunks: (4, 5) + # We don't need to check their memory usage: they are guaranteed to be smaller + # than both read and write chunks. + return tuple( + min(c_read, c_target) for c_read, c_target in zip(read_chunks, write_chunks) + ) + + +def calculate_stage_chunks( + read_chunks, write_chunks, stage_count=1, +): + """ + Calculate chunks after each stage of a multi-stage rechunking. + + Each stage consists of "split" step followed by a "consolidate" step. + + The strategy used here is to progressively enlarge or shrink chunks along + each dimension by the same multiple in each stage. This should roughly + minimize the total number of arrays resulting from "split" steps in a + multi-stage pipeline. It also keeps the total number of elements in each + chunk constant, up to rounding error, so memory usage should also remain + constant. + + Examples:: + + >>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=2) + [(1000, 1000)] + >>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=3) + [(10000, 100), (100, 10000)] + >>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=4) + [(31623, 32), (1000, 1000), (32, 31623)] + + TODO: consider more sophisticated algorithms. + """ + stage_chunks = [] + for stage in range(1, stage_count): + power = stage / stage_count + chunks = tuple( + round(r ** (1 - power) * w ** power) + for r, w in zip(read_chunks, write_chunks) + ) + stage_chunks.append(chunks) + return stage_chunks + + def rechunking_plan( shape: Sequence[int], source_chunks: Sequence[int], @@ -85,6 +138,8 @@ def rechunking_plan( consolidate_writes: bool = True, ) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]: """ + Calculate a plan for rechunking arrays. + Parameters ---------- shape : Tuple @@ -149,16 +204,6 @@ def rechunking_plan( else: read_chunks = tuple(source_chunks) - # Intermediate chunks are the smallest possible chunks which fit - # into both read_chunks and write_chunks. - # Example: - # read_chunks: (20, 5) - # target_chunks: (4, 25) - # intermediate_chunks: (4, 5) - # We don't need to check their memory usage: they are guaranteed to be smaller - # than both read and write chunks. - intermediate_chunks = [ - min(c_read, c_target) for c_read, c_target in zip(read_chunks, write_chunks) - ] + intermediate_chunks = _calculate_shared_chunks(read_chunks, write_chunks) - return read_chunks, tuple(intermediate_chunks), write_chunks + return read_chunks, intermediate_chunks, write_chunks diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index fd6189f..2b50885 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -5,7 +5,11 @@ import pytest from hypothesis import assume, given -from rechunker.algorithm import consolidate_chunks, rechunking_plan +from rechunker.algorithm import ( + consolidate_chunks, + rechunking_plan, + calculate_stage_chunks, +) from rechunker.compat import prod @@ -85,6 +89,23 @@ def test_consolidate_chunks_4D(shape, chunks, itemsize, max_mem, expected): assert chunk_mem <= max_mem +@pytest.mark.parametrize( + "read_chunks, write_chunks, stage_count, expected", + [ + ((100, 1), (1, 100), 1, []), + ((100, 1), (1, 100), 2, [(10, 10)]), + ((100, 1), (1, 100), 3, [(22, 5), (5, 22)]), + ((1_000_000, 1), (1, 1_000_000), 2, [(1000, 1000)]), + ((1_000_000, 1), (1, 1_000_000), 3, [(10000, 100), (100, 10000)]), + ((1_000_000, 1), (1, 1_000_000), 4, [(31623, 32), (1000, 1000), (32, 31623)]), + ((10, 10), (1, 100), 2, [(3, 32)]), + ], +) +def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected): + actual = calculate_stage_chunks(read_chunks, write_chunks, stage_count) + assert actual == expected + + def _verify_plan_correctness( source_chunks, read_chunks, From 9efbdff219af48381f1a45ee33e35b7b0d14960b Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 22 May 2021 18:34:23 -0700 Subject: [PATCH 02/12] Multi-stage rechunking algorithm --- rechunker/__init__.py | 2 +- rechunker/algorithm.py | 145 ++++++++++++++++++++++++++++------------ tests/test_algorithm.py | 117 +++++++++++++++++++++++--------- 3 files changed, 192 insertions(+), 72 deletions(-) diff --git a/rechunker/__init__.py b/rechunker/__init__.py index 099e343..f1a538b 100644 --- a/rechunker/__init__.py +++ b/rechunker/__init__.py @@ -4,5 +4,5 @@ except ImportError: __version__ = "unknown" -from .algorithm import rechunking_plan +from .algorithm import rechunking_plan, multistage_rechunking_plan from .api import Rechunked, rechunk diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 5149b34..60c2ec1 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -1,6 +1,7 @@ """Core rechunking algorithm stuff.""" from typing import List, Optional, Sequence, Tuple +from math import floor from rechunker.compat import prod @@ -92,8 +93,11 @@ def _calculate_shared_chunks( def calculate_stage_chunks( - read_chunks, write_chunks, stage_count=1, -): + read_chunks: Tuple[int, ...], + write_chunks: Tuple[int, ...], + stage_count: int = 1, + epsilon: float = 1e-8, +) -> List[Tuple[int, ...]]: """ Calculate chunks after each stage of a multi-stage rechunking. @@ -103,7 +107,7 @@ def calculate_stage_chunks( each dimension by the same multiple in each stage. This should roughly minimize the total number of arrays resulting from "split" steps in a multi-stage pipeline. It also keeps the total number of elements in each - chunk constant, up to rounding error, so memory usage should also remain + chunk constant, up to rounding error, so memory usage should remain constant. Examples:: @@ -120,48 +124,32 @@ def calculate_stage_chunks( stage_chunks = [] for stage in range(1, stage_count): power = stage / stage_count + # Add a small floating-point epsilon so we don't inadvertently + # round-down even chunk-sizes. chunks = tuple( - round(r ** (1 - power) * w ** power) - for r, w in zip(read_chunks, write_chunks) + floor(rc ** (1 - power) * wc ** power + epsilon) + for rc, wc in zip(read_chunks, write_chunks) ) stage_chunks.append(chunks) return stage_chunks -def rechunking_plan( +# not a tight upper bound, but ensures that the loop in +# multistage_rechunking_plan always terminates. +MAX_STAGES = 100 + + +def multistage_rechunking_plan( shape: Sequence[int], source_chunks: Sequence[int], target_chunks: Sequence[int], itemsize: int, + min_mem: int, max_mem: int, consolidate_reads: bool = True, consolidate_writes: bool = True, -) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]: - """ - Calculate a plan for rechunking arrays. - - Parameters - ---------- - shape : Tuple - Array shape - source_chunks : Tuple - Original chunk shape (must be in form (5, 10, 20), no irregular chunks) - target_chunks : Tuple - Target chunk shape (must be in form (5, 10, 20), no irregular chunks) - itemsize: int - Number of bytes used to represent a single array element - max_mem : Int - Maximum permissible chunk memory size, measured in units of itemsize - consolidate_reads: bool, optional - Whether to apply read chunk consolidation - consolidate_writes: bool, optional - Whether to apply write chunk consolidation - - Returns - ------- - new_chunks : tuple - The new chunks, size guaranteed to be <= mam_mem - """ +) -> List[Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]]: + """A rechunking plan that can use multiple split/consolidate steps.""" ndim = len(shape) if len(source_chunks) != ndim: raise ValueError(f"source_chunks {source_chunks} must have length {ndim}") @@ -180,23 +168,27 @@ def rechunking_plan( f"Target chunk memory ({target_chunk_mem}) exceeds max_mem ({max_mem})" ) + if max_mem < min_mem: # basic sanity test + raise ValueError( + "max_mem ({max_mem}) cannot be smaller than min_mem ({min_mem})" + ) + if consolidate_writes: write_chunks = consolidate_chunks(shape, target_chunks, itemsize, max_mem) else: write_chunks = tuple(target_chunks) if consolidate_reads: - read_chunk_limits: List[Optional[int]] - read_chunk_limits = [] # - for n_ax, (sc, wc) in enumerate(zip(source_chunks, write_chunks)): - read_chunk_lim: Optional[int] + read_chunk_limits: List[Optional[int]] = [] + for sc, wc in zip(source_chunks, write_chunks): + limit: Optional[int] if wc > sc: # consolidate reads over this axis, up to the write chunk size - read_chunk_lim = wc + limit = wc else: # don't consolidate reads over this axis - read_chunk_lim = None - read_chunk_limits.append(read_chunk_lim) + limit = None + read_chunk_limits.append(limit) read_chunks = consolidate_chunks( shape, source_chunks, itemsize, max_mem, read_chunk_limits @@ -204,6 +196,77 @@ def rechunking_plan( else: read_chunks = tuple(source_chunks) - intermediate_chunks = _calculate_shared_chunks(read_chunks, write_chunks) + # increase the number of stages until min_mem is exceeded + for stage_count in range(MAX_STAGES): + + stage_chunks = calculate_stage_chunks(read_chunks, write_chunks, stage_count) + pre_chunks = [read_chunks] + stage_chunks + post_chunks = stage_chunks + [write_chunks] + + int_chunks = [ + _calculate_shared_chunks(pre, post) + for pre, post in zip(pre_chunks, post_chunks) + ] + if all(itemsize * prod(chunks) >= min_mem for chunks in int_chunks): + # success! + return list(zip(pre_chunks, int_chunks, post_chunks)) + + raise AssertionError( + "Failed to find a feasible multi-staging rechunking scheme satisfying " + f"min_mem ({min_mem}) and max_mem ({max_mem}) constraints. " + "Please file a bug report on GitHub: " + "https://github.com/pangeo-data/rechunker/issues\n\n" + "Include the following debugging info:\n" + f"shape={shape}, source_chunks={source_chunks}, " + f"target_chunks={target_chunks}, itemsize={itemsize}, " + f"min_mem={min_mem}, max_mem={max_mem}, " + f"consolidate_reads={consolidate_reads}, " + f"consolidate_writes={consolidate_writes}" + ) + + +def rechunking_plan( + shape: Sequence[int], + source_chunks: Sequence[int], + target_chunks: Sequence[int], + itemsize: int, + max_mem: int, + consolidate_reads: bool = True, + consolidate_writes: bool = True, +) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]: + """ + Calculate a plan for rechunking arrays. + + Parameters + ---------- + shape : Tuple + Array shape + source_chunks : Tuple + Original chunk shape (must be in form (5, 10, 20), no irregular chunks) + target_chunks : Tuple + Target chunk shape (must be in form (5, 10, 20), no irregular chunks) + itemsize: int + Number of bytes used to represent a single array element + max_mem : Int + Maximum permissible chunk memory size, measured in units of itemsize + consolidate_reads: bool, optional + Whether to apply read chunk consolidation + consolidate_writes: bool, optional + Whether to apply write chunk consolidation - return read_chunks, intermediate_chunks, write_chunks + Returns + ------- + new_chunks : tuple + The new chunks, size guaranteed to be <= mam_mem + """ + (stage,) = multistage_rechunking_plan( + shape, + source_chunks, + target_chunks, + itemsize=itemsize, + min_mem=itemsize, + max_mem=max_mem, + consolidate_writes=consolidate_writes, + consolidate_reads=consolidate_reads, + ) + return stage diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 2b50885..88c4808 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -2,6 +2,7 @@ """Tests for `rechunker` package.""" import hypothesis.strategies as st +from unittest.mock import patch import pytest from hypothesis import assume, given @@ -9,6 +10,7 @@ consolidate_chunks, rechunking_plan, calculate_stage_chunks, + multistage_rechunking_plan, ) from rechunker.compat import prod @@ -94,11 +96,11 @@ def test_consolidate_chunks_4D(shape, chunks, itemsize, max_mem, expected): [ ((100, 1), (1, 100), 1, []), ((100, 1), (1, 100), 2, [(10, 10)]), - ((100, 1), (1, 100), 3, [(22, 5), (5, 22)]), + ((100, 1), (1, 100), 3, [(21, 4), (4, 21)]), ((1_000_000, 1), (1, 1_000_000), 2, [(1000, 1000)]), ((1_000_000, 1), (1, 1_000_000), 3, [(10000, 100), (100, 10000)]), - ((1_000_000, 1), (1, 1_000_000), 4, [(31623, 32), (1000, 1000), (32, 31623)]), - ((10, 10), (1, 100), 2, [(3, 32)]), + ((1_000_000, 1), (1, 1_000_000), 4, [(31622, 31), (1000, 1000), (31, 31622)]), + ((10, 10), (1, 100), 2, [(3, 31)]), ], ) def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected): @@ -106,18 +108,19 @@ def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected assert actual == expected -def _verify_plan_correctness( +def _verify_single_stage_plan_correctness( source_chunks, read_chunks, int_chunks, write_chunks, target_chunks, itemsize, + min_mem, max_mem, ): - assert itemsize * prod(read_chunks) <= max_mem - assert itemsize * prod(int_chunks) <= max_mem - assert itemsize * prod(write_chunks) <= max_mem + assert min_mem <= itemsize * prod(read_chunks) <= max_mem + assert min_mem <= itemsize * prod(int_chunks) <= max_mem + assert min_mem <= itemsize * prod(write_chunks) <= max_mem for sc, rc, ic, wc, tc in zip( source_chunks, read_chunks, int_chunks, write_chunks, target_chunks ): @@ -127,6 +130,21 @@ def _verify_plan_correctness( # todo: check for write overlaps +def _verify_multistage_plan_correctness( + stages, source_chunks, target_chunks, itemsize, min_mem, max_mem, +): + for sc, rc in zip(source_chunks, stages[0][0]): + assert rc >= sc + for tc, wc in zip(target_chunks, stages[-1][-1]): + assert wc >= tc + for read_chunks, int_chunks, write_chunks in stages: + assert min_mem <= itemsize * prod(read_chunks) <= max_mem + assert min_mem <= itemsize * prod(int_chunks) <= max_mem + assert min_mem <= itemsize * prod(write_chunks) <= max_mem + for rc, ic, wc in zip(read_chunks, int_chunks, write_chunks): + assert ic == min(rc, wc) + + @pytest.mark.parametrize( ( "shape, itemsize, source_chunks, target_chunks, max_mem, read_chunks_expected, " @@ -157,13 +175,15 @@ def test_rechunking_plan_1D( assert read_chunks == read_chunks_expected assert int_chunks == intermediate_chunks_expected assert write_chunks == write_chunks_expected - _verify_plan_correctness( + min_mem = itemsize + _verify_single_stage_plan_correctness( source_chunks, read_chunks, int_chunks, write_chunks, target_chunks, itemsize, + min_mem, max_mem, ) @@ -196,17 +216,52 @@ def test_rechunking_plan_2d( assert read_chunks == read_chunks_expected assert int_chunks == intermediate_chunks_expected assert write_chunks == write_chunks_expected - _verify_plan_correctness( + min_mem = itemsize + _verify_single_stage_plan_correctness( source_chunks, read_chunks, int_chunks, write_chunks, target_chunks, itemsize, + min_mem, max_mem, ) +@pytest.mark.parametrize( + "shape, source_chunks, target_chunks, itemsize, min_mem, max_mem, expected", + [ + ((100, 100), (100, 1), (1, 100), 1, 1, 100, [((100, 1), (1, 1), (1, 100))],), + ( + (100, 100), + (100, 1), + (1, 100), + 1, + 10, + 100, + [((100, 1), (10, 1), (10, 10)), ((10, 10), (1, 10), (1, 100)),], + ), + ], +) +def test_multistage_rechunking_plan( + shape, source_chunks, target_chunks, itemsize, min_mem, max_mem, expected, +): + stages = multistage_rechunking_plan( + shape, source_chunks, target_chunks, itemsize, min_mem, max_mem + ) + assert stages == expected + + +@patch("rechunker.algorithm.MAX_STAGES", 1) +def test_multistage_rechunking_plan_fails(): + with pytest.raises( + AssertionError, + match="Failed to find a feasible multi-staging rechunking scheme", + ): + multistage_rechunking_plan((100, 100), (100, 1), (1, 100), 1, 10, 100) + + @st.composite def shapes_chunks_maxmem(draw, ndim=3, itemsize=4, max_len=10_000): """Generate the data we need to test rechunking_plan.""" @@ -224,29 +279,35 @@ def shapes_chunks_maxmem(draw, ndim=3, itemsize=4, max_len=10_000): target_chunks.append(tc) source_chunk_mem = itemsize * prod(source_chunks) target_chunk_mem = itemsize * prod(target_chunks) - min_mem = max(source_chunk_mem, target_chunk_mem) - return (tuple(shape), tuple(source_chunks), tuple(target_chunks), min_mem) + orig_mem = max(source_chunk_mem, target_chunk_mem) + return (tuple(shape), tuple(source_chunks), tuple(target_chunks), orig_mem) @st.composite def shapes_chunks_maxmem_for_ndim(draw): ndim = draw(st.integers(min_value=1, max_value=5)) itemsize = 4 - shape, source_chunks, target_chunks, min_mem = draw( + shape, source_chunks, target_chunks, orig_mem = draw( shapes_chunks_maxmem(ndim=ndim, itemsize=4, max_len=10_000) ) - max_mem = min_mem * 10 - return shape, source_chunks, target_chunks, max_mem, itemsize + max_mem = orig_mem * 10 + min_mem = draw( + st.integers( + min_value=itemsize, + max_value=min(itemsize * max(prod(shape) // 4, 1), 5 * orig_mem), + ) + ) + return shape, source_chunks, target_chunks, min_mem, max_mem, itemsize @given(shapes_chunks_maxmem_for_ndim()) def test_rechunking_plan_hypothesis(inputs): - shape, source_chunks, target_chunks, max_mem, itemsize = inputs - # print(shape, source_chunks, target_chunks, max_mem) + shape, source_chunks, target_chunks, min_mem, max_mem, itemsize = inputs + print(shape, source_chunks, target_chunks, min_mem, max_mem) - args = shape, source_chunks, target_chunks, itemsize, max_mem - read_chunks, int_chunks, write_chunks = rechunking_plan(*args) - # print(" plan: ", read_chunks, int_chunks, write_chunks) + args = shape, source_chunks, target_chunks, itemsize, min_mem, max_mem + stages = multistage_rechunking_plan(*args) + print(" plan: ", stages) # this should be guaranteed by the test source_chunk_mem = itemsize * prod(source_chunks) @@ -255,16 +316,12 @@ def test_rechunking_plan_hypothesis(inputs): assert target_chunk_mem <= max_mem ndim = len(shape) - assert len(read_chunks) == ndim - assert len(int_chunks) == ndim - assert len(write_chunks) == ndim + for stage in stages: + read_chunks, int_chunks, write_chunks = stage + assert len(read_chunks) == ndim + assert len(int_chunks) == ndim + assert len(write_chunks) == ndim - _verify_plan_correctness( - source_chunks, - read_chunks, - int_chunks, - write_chunks, - target_chunks, - itemsize, - max_mem, + _verify_multistage_plan_correctness( + stages, source_chunks, target_chunks, itemsize, min_mem, max_mem, ) From 910dd945e60c4c6bfc64e1562eac4db9d5d57917 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 22 May 2021 22:00:40 -0700 Subject: [PATCH 03/12] sort imports --- rechunker/__init__.py | 2 +- rechunker/algorithm.py | 2 +- tests/test_algorithm.py | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rechunker/__init__.py b/rechunker/__init__.py index f1a538b..a006adb 100644 --- a/rechunker/__init__.py +++ b/rechunker/__init__.py @@ -4,5 +4,5 @@ except ImportError: __version__ = "unknown" -from .algorithm import rechunking_plan, multistage_rechunking_plan +from .algorithm import multistage_rechunking_plan, rechunking_plan from .api import Rechunked, rechunk diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 60c2ec1..7b68a36 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -1,7 +1,7 @@ """Core rechunking algorithm stuff.""" +from math import floor from typing import List, Optional, Sequence, Tuple -from math import floor from rechunker.compat import prod diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 88c4808..f0b744f 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -1,16 +1,17 @@ #!/usr/bin/env python """Tests for `rechunker` package.""" -import hypothesis.strategies as st from unittest.mock import patch + +import hypothesis.strategies as st import pytest from hypothesis import assume, given from rechunker.algorithm import ( - consolidate_chunks, - rechunking_plan, calculate_stage_chunks, + consolidate_chunks, multistage_rechunking_plan, + rechunking_plan, ) from rechunker.compat import prod From 8ee480cbb47d49ee7951ecaee4ffb7bf410e5962 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 23 May 2021 15:09:18 -0700 Subject: [PATCH 04/12] Stop using more stages when IO increases --- rechunker/algorithm.py | 66 ++++++++++++++++++++++++++++++++++++----- rechunker/compat.py | 25 +++++++++++----- tests/test_algorithm.py | 61 ++++++++++++++++++++++++++++++++++--- 3 files changed, 134 insertions(+), 18 deletions(-) diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 7b68a36..7a6fabe 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -1,8 +1,9 @@ """Core rechunking algorithm stuff.""" -from math import floor +import warnings +from math import ceil, floor from typing import List, Optional, Sequence, Tuple -from rechunker.compat import prod +from rechunker.compat import lcm, prod def consolidate_chunks( @@ -134,11 +135,38 @@ def calculate_stage_chunks( return stage_chunks +def _count_num_splits(source_chunk: int, target_chunk: int, size: int) -> int: + multiple = lcm(source_chunk, target_chunk) + splits_per_lcm = multiple // source_chunk + multiple // target_chunk - 1 + lcm_count, remainder = divmod(size, multiple) + if remainder: + splits_in_remainder = ( + ceil(remainder / source_chunk) + ceil(remainder / target_chunk) - 1 + ) + else: + splits_in_remainder = 0 + return lcm_count * splits_per_lcm + splits_in_remainder + + +def calculate_single_stage_io_ops( + shape: Sequence[int], in_chunks: Sequence[int], out_chunks: Sequence[int] +) -> int: + """Estimate the number of irregular chunks required for rechunking.""" + return prod(map(_count_num_splits, in_chunks, out_chunks, shape)) + + # not a tight upper bound, but ensures that the loop in # multistage_rechunking_plan always terminates. MAX_STAGES = 100 +_MultistagePlan = List[Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]] + + +class ExcessiveIOWarning(Warning): + pass + + def multistage_rechunking_plan( shape: Sequence[int], source_chunks: Sequence[int], @@ -148,7 +176,7 @@ def multistage_rechunking_plan( max_mem: int, consolidate_reads: bool = True, consolidate_writes: bool = True, -) -> List[Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]]: +) -> _MultistagePlan: """A rechunking plan that can use multiple split/consolidate steps.""" ndim = len(shape) if len(source_chunks) != ndim: @@ -196,8 +224,11 @@ def multistage_rechunking_plan( else: read_chunks = tuple(source_chunks) + prev_io_ops: Optional[float] = None + prev_plan: Optional[_MultistagePlan] = None + # increase the number of stages until min_mem is exceeded - for stage_count in range(MAX_STAGES): + for stage_count in range(1, MAX_STAGES): stage_chunks = calculate_stage_chunks(read_chunks, write_chunks, stage_count) pre_chunks = [read_chunks] + stage_chunks @@ -207,9 +238,30 @@ def multistage_rechunking_plan( _calculate_shared_chunks(pre, post) for pre, post in zip(pre_chunks, post_chunks) ] - if all(itemsize * prod(chunks) >= min_mem for chunks in int_chunks): - # success! - return list(zip(pre_chunks, int_chunks, post_chunks)) + plan = list(zip(pre_chunks, int_chunks, post_chunks)) + + int_mem = min(itemsize * prod(chunks) for chunks in int_chunks) + if int_mem >= min_mem: + return plan # success! + + io_ops = sum( + calculate_single_stage_io_ops(shape, pre, post) + for pre, post in zip(pre_chunks, post_chunks) + ) + if prev_io_ops is not None and io_ops > prev_io_ops: + warnings.warn( + "Search for multi-stage rechunking plan terminated before " + "achieving the minimum memory requirement due to increasing IO " + f"requirements. Smallest intermediates have size {int_mem}." + f"Consider decreasing min_mem ({min_mem}) or increasing " + f"({max_mem}) to find a more efficient plan.", + category=ExcessiveIOWarning, + ) + assert prev_plan is not None + return prev_plan + + prev_io_ops = io_ops + prev_plan = plan raise AssertionError( "Failed to find a feasible multi-staging rechunking scheme satisfying " diff --git a/rechunker/compat.py b/rechunker/compat.py index 6438a21..ac4e003 100644 --- a/rechunker/compat.py +++ b/rechunker/compat.py @@ -1,13 +1,24 @@ +import math import operator from functools import reduce -from typing import Sequence +from typing import Iterable, TypeVar +T = TypeVar("T", int, float) -def prod(iterable: Sequence[int]) -> int: - """Implementation of `math.prod()` all Python versions.""" - try: - from math import prod as mathprod # type: ignore # Python 3.8 - return mathprod(iterable) - except ImportError: +try: + from math import prod # type: ignore # Python 3.8 +except ImportError: + + def prod(iterable: Iterable[T]) -> T: # type: ignore + """Implementation of `math.prod()` for all Python versions.""" return reduce(operator.mul, iterable, 1) + + +try: + from math import lcm # type: ignore # Python 3.9 +except ImportError: + + def lcm(a: int, b: int) -> int: # type: ignore + """Implementation of `math.lcm()` for all Python versions.""" + return abs(a * b) // math.gcd(a, b) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index f0b744f..60b431c 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """Tests for `rechunker` package.""" +import warnings from unittest.mock import patch import hypothesis.strategies as st @@ -8,6 +9,8 @@ from hypothesis import assume, given from rechunker.algorithm import ( + ExcessiveIOWarning, + calculate_single_stage_io_ops, calculate_stage_chunks, consolidate_chunks, multistage_rechunking_plan, @@ -109,6 +112,31 @@ def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected assert actual == expected +@pytest.mark.parametrize( + "shape, in_chunks, out_chunks, expected", + [ + ((6,), (1,), (6,), 6), + ((10,), (1,), (6,), 10), + ((6,), (2,), (3,), 4), + ((24,), (2,), (3,), 16), + ((10,), (4,), (5,), 4), + ((100,), (4,), (5,), 40), + ((100, 100), (1, 100), (100, 1), 10_000), + ((100, 100), (1, 10), (10, 1), 10_000), + ((100, 100), (20, 20), (25, 25), (5 + 3) ** 2), + ((50, 50), (20, 20), (25, 25), ((5 + 3) // 2) ** 2), + ((100,), (43,), (100,), 3), + ((100,), (43,), (51,), 4), + ((100,), (43,), (40,), 5), + ((100,), (43,), (10,), 12), + ((100,), (43,), (1,), 100), + ], +) +def test_calculate_single_stage_io_ops(shape, in_chunks, out_chunks, expected): + actual = calculate_single_stage_io_ops(shape, in_chunks, out_chunks) + assert actual == expected + + def _verify_single_stage_plan_correctness( source_chunks, read_chunks, @@ -132,7 +160,13 @@ def _verify_single_stage_plan_correctness( def _verify_multistage_plan_correctness( - stages, source_chunks, target_chunks, itemsize, min_mem, max_mem, + stages, + source_chunks, + target_chunks, + itemsize, + min_mem, + max_mem, + excessive_io=False, ): for sc, rc in zip(source_chunks, stages[0][0]): assert rc >= sc @@ -140,7 +174,11 @@ def _verify_multistage_plan_correctness( assert wc >= tc for read_chunks, int_chunks, write_chunks in stages: assert min_mem <= itemsize * prod(read_chunks) <= max_mem - assert min_mem <= itemsize * prod(int_chunks) <= max_mem + assert itemsize * prod(int_chunks) <= max_mem + if excessive_io: + assert min_mem >= itemsize * prod(int_chunks) + else: + assert min_mem <= itemsize * prod(int_chunks) assert min_mem <= itemsize * prod(write_chunks) <= max_mem for rc, ic, wc in zip(read_chunks, int_chunks, write_chunks): assert ic == min(rc, wc) @@ -254,6 +292,13 @@ def test_multistage_rechunking_plan( assert stages == expected +def test_multistage_rechunking_plan_warns(): + with pytest.warns( + ExcessiveIOWarning, match="Search for multi-stage rechunking plan terminated", + ): + multistage_rechunking_plan((100, 100), (100, 1), (1, 100), 1, 90, 100) + + @patch("rechunker.algorithm.MAX_STAGES", 1) def test_multistage_rechunking_plan_fails(): with pytest.raises( @@ -307,7 +352,9 @@ def test_rechunking_plan_hypothesis(inputs): print(shape, source_chunks, target_chunks, min_mem, max_mem) args = shape, source_chunks, target_chunks, itemsize, min_mem, max_mem - stages = multistage_rechunking_plan(*args) + with warnings.catch_warnings(record=True) as w_list: + stages = multistage_rechunking_plan(*args) + excessive_io = any(issubclass(w.category, ExcessiveIOWarning) for w in w_list) print(" plan: ", stages) # this should be guaranteed by the test @@ -324,5 +371,11 @@ def test_rechunking_plan_hypothesis(inputs): assert len(write_chunks) == ndim _verify_multistage_plan_correctness( - stages, source_chunks, target_chunks, itemsize, min_mem, max_mem, + stages, + source_chunks, + target_chunks, + itemsize, + min_mem, + max_mem, + excessive_io=excessive_io, ) From 6a8db586f50eaf9c7733db596e4056558cd2a62f Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 23 May 2021 18:05:05 -0700 Subject: [PATCH 05/12] Hypothesis test for IO ops count --- tests/test_algorithm.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 60b431c..c6cee27 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -5,6 +5,7 @@ from unittest.mock import patch import hypothesis.strategies as st +import numpy as np import pytest from hypothesis import assume, given @@ -137,6 +138,29 @@ def test_calculate_single_stage_io_ops(shape, in_chunks, out_chunks, expected): assert actual == expected +@st.composite +def io_ops_chunks(draw, max_len=1000): + size = draw(st.integers(min_value=1, max_value=max_len)) + source = draw(st.integers(min_value=1, max_value=max_len)) + target = draw(st.integers(min_value=1, max_value=max_len)) + return (size, source, target) + + +@given(io_ops_chunks()) +def test_calculate_single_stage_io_ops_hypothesis(inputs): + size, source, target = inputs + + calculated = calculate_single_stage_io_ops((size,), (source,), (target,)) + + table = np.empty(shape=(size, 2), dtype=int) + for i in range(size): + table[i, 0] = i // source + table[i, 1] = i // target + actual = np.unique(table, axis=0).shape[0] + + assert calculated == actual + + def _verify_single_stage_plan_correctness( source_chunks, read_chunks, From 3fb5b91fe1dabe113db42345a6239f3f9df815b6 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 24 May 2021 10:02:24 -0700 Subject: [PATCH 06/12] more test coverage --- tests/test_algorithm.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index c6cee27..63dce5d 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -332,6 +332,13 @@ def test_multistage_rechunking_plan_fails(): multistage_rechunking_plan((100, 100), (100, 1), (1, 100), 1, 10, 100) +def test_rechunking_plan_invalid_min_mem(): + with pytest.raises( + ValueError, match="cannot be smaller than min_mem", + ): + multistage_rechunking_plan((100, 100), (100, 1), (1, 100), 1, 101, 100) + + @st.composite def shapes_chunks_maxmem(draw, ndim=3, itemsize=4, max_len=10_000): """Generate the data we need to test rechunking_plan.""" From fbd0f731423410aac6adbf34ca29b3ab521c0be5 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 21 Nov 2022 11:40:34 -0800 Subject: [PATCH 07/12] Fixes after merging in master --- rechunker/algorithm.py | 4 +- tests/test_algorithm.py | 19 ++- tests/test_rechunk.py | 320 ++++++++++++++++++++++++++++++---------- 3 files changed, 252 insertions(+), 91 deletions(-) diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 300c1b9..bad0208 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -4,7 +4,7 @@ from math import ceil, floor from typing import List, Optional, Sequence, Tuple -from rechunker.compat import prod +from rechunker.compat import lcm, prod logger = logging.getLogger(__name__) @@ -274,7 +274,7 @@ def multistage_rechunking_plan( warnings.warn( "Search for multi-stage rechunking plan terminated before " "achieving the minimum memory requirement due to increasing IO " - f"requirements. Smallest intermediates have size {int_mem}." + f"requirements. Smallest intermediates have size {int_mem}. " f"Consider decreasing min_mem ({min_mem}) or increasing " f"({max_mem}) to find a more efficient plan.", category=ExcessiveIOWarning, diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 6e8b1b9..5c3b5bc 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -5,7 +5,6 @@ from unittest.mock import patch import hypothesis.strategies as st -import numpy as np import pytest from hypothesis import assume, given @@ -105,7 +104,7 @@ def test_consolidate_chunks_4D(shape, chunks, itemsize, max_mem, expected): assert chunk_mem <= max_mem -def _verify_plan_correctness( +def _verify_single_stage_plan_correctness( shape, source_chunks, read_chunks, @@ -116,9 +115,9 @@ def _verify_plan_correctness( min_mem, max_mem, ): - assert itemsize * prod(read_chunks) <= max_mem - assert itemsize * prod(int_chunks) <= max_mem - assert itemsize * prod(write_chunks) <= max_mem + assert min_mem <= itemsize * prod(read_chunks) <= max_mem + assert min_mem <= itemsize * prod(int_chunks) <= max_mem + assert min_mem <= itemsize * prod(write_chunks) <= max_mem for n, sc, rc, ic, wc, tc in zip( shape, source_chunks, read_chunks, int_chunks, write_chunks, target_chunks ): @@ -132,6 +131,7 @@ def _verify_plan_correctness( def _verify_multistage_plan_correctness( + shape, stages, source_chunks, target_chunks, @@ -142,8 +142,9 @@ def _verify_multistage_plan_correctness( ): for sc, rc in zip(source_chunks, stages[0][0]): assert rc >= sc - for tc, wc in zip(target_chunks, stages[-1][-1]): + for n, tc, wc in zip(shape, target_chunks, stages[-1][-1]): assert wc >= tc + assert (wc == n) or (wc % tc == 0) for read_chunks, int_chunks, write_chunks in stages: assert min_mem <= itemsize * prod(read_chunks) <= max_mem assert itemsize * prod(int_chunks) <= max_mem @@ -186,7 +187,9 @@ def test_rechunking_plan_1D( assert read_chunks == read_chunks_expected assert int_chunks == intermediate_chunks_expected assert write_chunks == write_chunks_expected - _verify_plan_correctness( + min_mem = itemsize + _verify_single_stage_plan_correctness( + shape, source_chunks, read_chunks, int_chunks, @@ -369,8 +372,8 @@ def test_rechunking_plan_hypothesis(inputs): assert len(write_chunks) == ndim _verify_multistage_plan_correctness( - stages, shape, + stages, source_chunks, target_chunks, itemsize, diff --git a/tests/test_rechunk.py b/tests/test_rechunk.py index 3350809..f5d8820 100644 --- a/tests/test_rechunk.py +++ b/tests/test_rechunk.py @@ -5,10 +5,9 @@ import dask import dask.array as dsa import dask.core -import fsspec import numpy +import numpy as np import pytest -import xarray import zarr from rechunker import api @@ -29,7 +28,6 @@ def requires_import(module, *args): requires_beam = partial(requires_import, "apache_beam") requires_prefect = partial(requires_import, "prefect") -requires_pywren = partial(requires_import, "pywren_ibm_cloud") @pytest.fixture(params=[(8000, 200), {"y": 8000, "x": 200}]) @@ -42,6 +40,151 @@ def test_invalid_executor(): api._get_executor("unknown") +@pytest.fixture(scope="session") +def chunk_ds(): + xarray = pytest.importorskip("xarray") + + lon = numpy.arange(-180, 180) + lat = numpy.arange(-90, 90) + time = numpy.arange(365) + ds = xarray.Dataset( + data_vars=dict( + aaa=( + ["lon", "lat", "time"], + numpy.random.randint(0, 101, (len(lon), len(lat), len(time))), + ) + ), + coords=dict( + lon=lon, + lat=lat, + time=time, + ), + ) + return ds + + +def example_dataset(shape): + # TODO: simplify the creation of datasets here + # TODO: See https://github.com/pangeo-data/rechunker/pull/93#discussion_r713939185 + # TODO: Maybe it is best to refactor tests to use `chunk_ds` + xarray = pytest.importorskip("xarray") + + a = numpy.arange(numpy.prod(shape)).reshape(shape).astype("f4") + a[-1] = numpy.nan + ds = xarray.Dataset( + dict( + a=xarray.DataArray( + a, dims=["x", "y"], attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"} + ), + b=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]), + c=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]), + ), + coords=dict( + cx=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]), + cy=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]), + ), + attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"}, + ) + return ds + + +@pytest.mark.parametrize( + "target_chunks,expected", + [ + pytest.param( + dict(lon=10), + dict(aaa=(10, 180, 365), lon=(10,), lat=(180,), time=(365,)), + id="just lon chunk", + ), + pytest.param( + dict(lat=10), + dict(aaa=(360, 10, 365), lon=(360,), lat=(10,), time=(365,)), + id="just lat chunk", + ), + pytest.param( + dict(time=10), + dict(aaa=(360, 180, 10), lon=(360,), lat=(180,), time=(10,)), + id="just time chunk", + ), + pytest.param( + dict(lon=10, lat=10, time=10), + dict(aaa=(10, 10, 10), lon=(10,), lat=(10,), time=(10,)), + id="all dimensions - equal chunks", + ), + pytest.param( + dict(lon=10, lat=20, time=30), + dict(aaa=(10, 20, 30), lon=(10,), lat=(20,), time=(30,)), + id="all dimensions - different chunks", + ), + pytest.param( + dict(lon=1000), + dict(aaa=(360, 180, 365), lon=(360,), lat=(180,), time=(365,)), + id="lon chunk greater than size", + ), + pytest.param( + dict(lat=1000), + dict(aaa=(360, 180, 365), lon=(360,), lat=(180,), time=(365,)), + id="lat chunk greater than size", + ), + pytest.param( + dict(time=1000), + dict(aaa=(360, 180, 365), lon=(360,), lat=(180,), time=(365,)), + id="time chunk greater than size", + ), + pytest.param( + dict(lon=1000, lat=1000, time=1000), + dict(aaa=(360, 180, 365), lon=(360,), lat=(180,), time=(365,)), + id="all chunks greater than size", + ), + ], +) +def test_parse_target_chunks_from_dim_chunks(chunk_ds, target_chunks, expected) -> None: + result = api.parse_target_chunks_from_dim_chunks( + ds=chunk_ds, target_chunks=target_chunks + ) + assert expected == result + + +@pytest.mark.parametrize( + "dask_chunks, dim, target_chunks, expected", + [ + pytest.param( + None, + "lon", + dict(lon=10), + 10, + id="small lon chunks numpy array", + ), + pytest.param( + None, + "lon", + dict(lon=10), + 10, + id="small lon chunks dask array", + ), + pytest.param( + None, + "time", + dict(time=400), + 365, + id="time chunks exceed len", + ), + pytest.param( + {"time": 1}, + "time", + dict(time=-1), + 365, + id="negative time chunks dask array", + ), + ], +) +def test_get_dim_chunk(dask_chunks, chunk_ds, dim, target_chunks, expected): + if dask_chunks: + chunk_ds = chunk_ds.chunk(dask_chunks) + chunk = api.get_dim_chunk(chunk_ds.aaa, dim, target_chunks) + assert chunk == expected + + @pytest.mark.parametrize("shape", [(100, 50)]) @pytest.mark.parametrize("source_chunks", [(10, 50)]) @pytest.mark.parametrize( @@ -49,7 +192,7 @@ def test_invalid_executor(): [{"a": (20, 10), "b": (20,)}, {"a": {"x": 20, "y": 10}, "b": {"x": 20}}], ) @pytest.mark.parametrize("max_mem", ["10MB"]) -@pytest.mark.parametrize("executor", ["dask", "python", "prefect"]) +@pytest.mark.parametrize("executor", ["dask", "python", requires_prefect("prefect")]) @pytest.mark.parametrize("target_store", ["target.zarr", "mapper.target.zarr"]) @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"]) def test_rechunk_dataset( @@ -62,30 +205,17 @@ def test_rechunk_dataset( target_store, temp_store, ): + xarray = pytest.importorskip("xarray") + if target_store.startswith("mapper"): + fsspec = pytest.importorskip("fsspec") target_store = fsspec.get_mapper(str(tmp_path) + target_store) temp_store = fsspec.get_mapper(str(tmp_path) + temp_store) else: target_store = str(tmp_path / target_store) temp_store = str(tmp_path / temp_store) - a = numpy.arange(numpy.prod(shape)).reshape(shape).astype("f4") - a[-1] = numpy.nan - ds = xarray.Dataset( - dict( - a=xarray.DataArray( - a, dims=["x", "y"], attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"} - ), - b=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]), - c=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]), - ), - coords=dict( - cx=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]), - cy=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]), - ), - attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"}, - ) - ds = ds.chunk(chunks=dict(zip(["x", "y"], source_chunks))) + ds = example_dataset(shape).chunk(chunks=dict(zip(["x", "y"], source_chunks))) options = dict( a=dict( compressor=zarr.Blosc(cname="zstd"), @@ -107,10 +237,6 @@ def test_rechunk_dataset( with dask.config.set(scheduler="single-threaded"): rechunked.execute() - # check zarr store directly - # zstore = zarr.open_group(target_store) - # print(zstore.tree()) - # Validate encoded variables dst = xarray.open_zarr(target_store, decode_cf=False) assert dst.a.dtype == options["a"]["dtype"] @@ -131,6 +257,71 @@ def test_rechunk_dataset( assert ds.attrs == dst.attrs +@pytest.mark.parametrize("shape", [(100, 50)]) +@pytest.mark.parametrize("source_chunks", [(10, 50), (100, 5)]) +@pytest.mark.parametrize( + "target_chunks", + [ + {"x": 20}, # This should leave y chunks untouched + {"x": 20, "y": 100_000}, + {"x": 20, "y": -1}, + ], +) +@pytest.mark.parametrize("max_mem", ["10MB"]) +def test_rechunk_dataset_dimchunks( + tmp_path, + shape, + source_chunks, + target_chunks, + max_mem, +): + xarray = pytest.importorskip("xarray") + + temp_store = "temp.zarr" + target_store = "target.zarr" + target_store = str(tmp_path / target_store) + temp_store = str(tmp_path / temp_store) + + ds = example_dataset(shape).chunk(chunks=dict(zip(["x", "y"], source_chunks))) + options = dict( + a=dict( + compressor=zarr.Blosc(cname="zstd"), + dtype="int32", + scale_factor=0.1, + _FillValue=-9999, + ) + ) + rechunked = api.rechunk( + ds, + target_chunks=target_chunks, + max_mem=max_mem, + target_store=target_store, + target_options=options, + temp_store=temp_store, + ) + assert isinstance(rechunked, api.Rechunked) + with dask.config.set(scheduler="single-threaded"): + rechunked.execute() + + # Validate decoded variables + dst = xarray.open_zarr(target_store, decode_cf=True) + target_chunks_expected = [ + target_chunks.get("x", source_chunks[0]), + target_chunks.get("y", source_chunks[1]), + ] + if target_chunks_expected[1] < 0 or target_chunks_expected[1] > len(ds.y): + target_chunks_expected[1] = len(ds.y) + + target_chunks_expected = tuple(target_chunks_expected) + + assert dst.a.data.chunksize == target_chunks_expected + assert dst.b.data.chunksize == target_chunks_expected[:1] + assert dst.c.data.chunksize == target_chunks_expected[1:] + + xarray.testing.assert_equal(ds.compute(), dst.compute()) + assert ds.attrs == dst.attrs + + @pytest.mark.parametrize("shape", [(8000, 8000)]) @pytest.mark.parametrize("source_chunks", [(200, 8000)]) @pytest.mark.parametrize("dtype", ["f4"]) @@ -142,7 +333,6 @@ def test_rechunk_dataset( "python", requires_beam("beam"), requires_prefect("prefect"), - requires_pywren("pywren"), ], ) @pytest.mark.parametrize( @@ -198,8 +388,8 @@ def test_rechunk_array( result = rechunked.execute() assert isinstance(result, zarr.Array) - a_tar = dsa.from_zarr(target_array) - assert dsa.equal(a_tar, 1).all().compute() + a_tar = np.asarray(result) + np.testing.assert_equal(a_tar, 1) @pytest.mark.parametrize("shape", [(8000, 8000)]) @@ -207,7 +397,13 @@ def test_rechunk_array( @pytest.mark.parametrize("dtype", ["f4"]) @pytest.mark.parametrize("max_mem", [25600000]) @pytest.mark.parametrize( - "target_chunks", [(200, 8000), (800, 8000), (8000, 200), (400, 8000),], + "target_chunks", + [ + (200, 8000), + (800, 8000), + (8000, 200), + (400, 8000), + ], ) def test_rechunk_dask_array( tmp_path, shape, source_chunks, dtype, target_chunks, max_mem @@ -242,7 +438,6 @@ def test_rechunk_dask_array( "python", requires_beam("beam"), requires_prefect("prefect"), - requires_pywren("pywren"), ], ) @pytest.mark.parametrize("source_store", ["source.zarr", "mapper.source.zarr"]) @@ -250,6 +445,7 @@ def test_rechunk_dask_array( @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"]) def test_rechunk_group(tmp_path, executor, source_store, target_store, temp_store): if source_store.startswith("mapper"): + fsspec = pytest.importorskip("fsspec") store_source = fsspec.get_mapper(str(tmp_path) + source_store) target_store = fsspec.get_mapper(str(tmp_path) + target_store) temp_store = fsspec.get_mapper(str(tmp_path) + temp_store) @@ -292,6 +488,8 @@ def test_rechunk_group(tmp_path, executor, source_store, target_store, temp_stor def sample_xarray_dataset(): + xarray = pytest.importorskip("xarray") + return xarray.Dataset( dict( a=xarray.DataArray( @@ -361,7 +559,11 @@ def rechunk_args(tmp_path, request): target_chunks = (8000, 200) args.update( - {"source": array, "target_chunks": target_chunks, "max_mem": max_mem,} + { + "source": array, + "target_chunks": target_chunks, + "max_mem": max_mem, + } ) return args @@ -384,6 +586,8 @@ def test_repr_html(rechunked): def _is_collection(source): + xarray = pytest.importorskip("xarray") + assert isinstance( source, (dask.array.Array, zarr.core.Array, zarr.hierarchy.Group, xarray.Dataset), @@ -444,6 +648,8 @@ def rechunk(compressor): def test_rechunk_invalid_option(rechunk_args): + xarray = pytest.importorskip("xarray") + if isinstance(rechunk_args["source"], xarray.Dataset): # Options are essentially unbounded for Xarray (for CF encoding params), # so check only options with special error cases @@ -516,54 +722,6 @@ def test_no_intermediate_fused(tmp_path): rechunked = api.rechunk(source_array, target_chunks, max_mem, target_store) - num_tasks = len([v for v in rechunked.plan.dask.values() if dask.core.istask(v)]) + # rechunked.plan is a list of dask delayed objects + num_tasks = len([v for v in rechunked.plan[0].dask.values() if dask.core.istask(v)]) assert num_tasks < 20 # less than if no fuse - - -def test_pywren_function_executor(tmp_path): - pytest.importorskip("pywren_ibm_cloud") - from rechunker.executors.pywren import ( - PywrenExecutor, - pywren_local_function_executor, - ) - - # Create a Pywren function exectutor that we manage ourselves - # and pass in to rechunker's PywrenExecutor - with pywren_local_function_executor() as function_executor: - - executor = PywrenExecutor(function_executor) - - shape = (8000, 8000) - source_chunks = (200, 8000) - dtype = "f4" - max_mem = 25600000 - target_chunks = (400, 8000) - - ### Create source array ### - store_source = str(tmp_path / "source.zarr") - source_array = zarr.ones( - shape, chunks=source_chunks, dtype=dtype, store=store_source - ) - - ### Create targets ### - target_store = str(tmp_path / "target.zarr") - temp_store = str(tmp_path / "temp.zarr") - - rechunked = api.rechunk( - source_array, - target_chunks, - max_mem, - target_store, - temp_store=temp_store, - executor=executor, - ) - assert isinstance(rechunked, api.Rechunked) - - target_array = zarr.open(target_store) - - assert target_array.chunks == tuple(target_chunks) - - result = rechunked.execute() - assert isinstance(result, zarr.Array) - a_tar = dsa.from_zarr(target_array) - assert dsa.equal(a_tar, 1).all().compute() From 5371fed88146737fa352d93e6779ef0d078fc3a5 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 21 Nov 2022 12:25:46 -0800 Subject: [PATCH 08/12] restore accidentally removed tests --- tests/test_algorithm.py | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 5c3b5bc..e40e4a9 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -5,6 +5,7 @@ from unittest.mock import patch import hypothesis.strategies as st +import numpy as np import pytest from hypothesis import assume, given @@ -104,6 +105,71 @@ def test_consolidate_chunks_4D(shape, chunks, itemsize, max_mem, expected): assert chunk_mem <= max_mem +@pytest.mark.parametrize( + "read_chunks, write_chunks, stage_count, expected", + [ + ((100, 1), (1, 100), 1, []), + ((100, 1), (1, 100), 2, [(10, 10)]), + ((100, 1), (1, 100), 3, [(21, 4), (4, 21)]), + ((1_000_000, 1), (1, 1_000_000), 2, [(1000, 1000)]), + ((1_000_000, 1), (1, 1_000_000), 3, [(10000, 100), (100, 10000)]), + ((1_000_000, 1), (1, 1_000_000), 4, [(31622, 31), (1000, 1000), (31, 31622)]), + ((10, 10), (1, 100), 2, [(3, 31)]), + ], +) +def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected): + actual = calculate_stage_chunks(read_chunks, write_chunks, stage_count) + assert actual == expected + + +@pytest.mark.parametrize( + "shape, in_chunks, out_chunks, expected", + [ + ((6,), (1,), (6,), 6), + ((10,), (1,), (6,), 10), + ((6,), (2,), (3,), 4), + ((24,), (2,), (3,), 16), + ((10,), (4,), (5,), 4), + ((100,), (4,), (5,), 40), + ((100, 100), (1, 100), (100, 1), 10_000), + ((100, 100), (1, 10), (10, 1), 10_000), + ((100, 100), (20, 20), (25, 25), (5 + 3) ** 2), + ((50, 50), (20, 20), (25, 25), ((5 + 3) // 2) ** 2), + ((100,), (43,), (100,), 3), + ((100,), (43,), (51,), 4), + ((100,), (43,), (40,), 5), + ((100,), (43,), (10,), 12), + ((100,), (43,), (1,), 100), + ], +) +def test_calculate_single_stage_io_ops(shape, in_chunks, out_chunks, expected): + actual = calculate_single_stage_io_ops(shape, in_chunks, out_chunks) + assert actual == expected + + +@st.composite +def io_ops_chunks(draw, max_len=1000): + size = draw(st.integers(min_value=1, max_value=max_len)) + source = draw(st.integers(min_value=1, max_value=max_len)) + target = draw(st.integers(min_value=1, max_value=max_len)) + return (size, source, target) + + +@given(io_ops_chunks()) +def test_calculate_single_stage_io_ops_hypothesis(inputs): + size, source, target = inputs + + calculated = calculate_single_stage_io_ops((size,), (source,), (target,)) + + table = np.empty(shape=(size, 2), dtype=int) + for i in range(size): + table[i, 0] = i // source + table[i, 1] = i // target + actual = np.unique(table, axis=0).shape[0] + + assert calculated == actual + + def _verify_single_stage_plan_correctness( shape, source_chunks, From 7d3a4d8834b8b43351832fb7eb1227c4ea28bc04 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 21 Nov 2022 12:45:43 -0800 Subject: [PATCH 09/12] remove rechunker.compat.prod (unnecessary for Python 3.8+) --- rechunker/algorithm.py | 4 ++-- rechunker/compat.py | 18 ++---------------- tests/test_algorithm.py | 2 +- tests/test_compat.py | 11 ----------- 4 files changed, 5 insertions(+), 30 deletions(-) delete mode 100644 tests/test_compat.py diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index bad0208..555874e 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -1,10 +1,10 @@ """Core rechunking algorithm stuff.""" import logging import warnings -from math import ceil, floor +from math import ceil, floor, prod from typing import List, Optional, Sequence, Tuple -from rechunker.compat import lcm, prod +from rechunker.compat import lcm logger = logging.getLogger(__name__) diff --git a/rechunker/compat.py b/rechunker/compat.py index ac4e003..1403084 100644 --- a/rechunker/compat.py +++ b/rechunker/compat.py @@ -1,19 +1,4 @@ import math -import operator -from functools import reduce -from typing import Iterable, TypeVar - -T = TypeVar("T", int, float) - - -try: - from math import prod # type: ignore # Python 3.8 -except ImportError: - - def prod(iterable: Iterable[T]) -> T: # type: ignore - """Implementation of `math.prod()` for all Python versions.""" - return reduce(operator.mul, iterable, 1) - try: from math import lcm # type: ignore # Python 3.9 @@ -21,4 +6,5 @@ def prod(iterable: Iterable[T]) -> T: # type: ignore def lcm(a: int, b: int) -> int: # type: ignore """Implementation of `math.lcm()` for all Python versions.""" - return abs(a * b) // math.gcd(a, b) + # https://stackoverflow.com/a/51716959/809705 + return a * b // math.gcd(a, b) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index e40e4a9..2a95ec6 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """Tests for `rechunker` package.""" +from math import prod import warnings from unittest.mock import patch @@ -17,7 +18,6 @@ multistage_rechunking_plan, rechunking_plan, ) -from rechunker.compat import prod @pytest.mark.parametrize("shape, chunks", [((8, 8), (1, 2))]) diff --git a/tests/test_compat.py b/tests/test_compat.py deleted file mode 100644 index ab20008..0000000 --- a/tests/test_compat.py +++ /dev/null @@ -1,11 +0,0 @@ -import numpy as np - -from rechunker.compat import prod - - -def test_prod(): - assert prod(()) == 1 - assert prod((2,)) == 2 - assert prod((2, 3)) == 6 - n = np.iinfo(np.int64).max - assert prod((n, 2)) == n * 2 From 63568f0dc6c62f9762f5da44338796a2a8950d03 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 21 Nov 2022 20:46:02 +0000 Subject: [PATCH 10/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_algorithm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 2a95ec6..8b49515 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -1,8 +1,8 @@ #!/usr/bin/env python """Tests for `rechunker` package.""" -from math import prod import warnings +from math import prod from unittest.mock import patch import hypothesis.strategies as st From f55e0b6ee4165986f6279a9472c059dba586572f Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 21 Nov 2022 16:42:36 -0800 Subject: [PATCH 11/12] fixes per review --- rechunker/algorithm.py | 66 ++++++++++++++++++++++++++--------------- tests/test_algorithm.py | 7 +++-- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 555874e..373014b 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -4,6 +4,7 @@ from math import ceil, floor, prod from typing import List, Optional, Sequence, Tuple +import numpy as np from rechunker.compat import lcm logger = logging.getLogger(__name__) @@ -113,7 +114,6 @@ def calculate_stage_chunks( read_chunks: Tuple[int, ...], write_chunks: Tuple[int, ...], stage_count: int = 1, - epsilon: float = 1e-8, ) -> List[Tuple[int, ...]]: """ Calculate chunks after each stage of a multi-stage rechunking. @@ -121,10 +121,10 @@ def calculate_stage_chunks( Each stage consists of "split" step followed by a "consolidate" step. The strategy used here is to progressively enlarge or shrink chunks along - each dimension by the same multiple in each stage. This should roughly - minimize the total number of arrays resulting from "split" steps in a - multi-stage pipeline. It also keeps the total number of elements in each - chunk constant, up to rounding error, so memory usage should remain + each dimension by the same multiple in each stage (geometric spacing). This + should roughly minimize the total number of arrays resulting from "split" + steps in a multi-stage pipeline. It also keeps the total number of elements + in each chunk constant, up to rounding error, so memory usage should remain constant. Examples:: @@ -136,22 +136,35 @@ def calculate_stage_chunks( >>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=4) [(31623, 32), (1000, 1000), (32, 31623)] - TODO: consider more sophisticated algorithms. + TODO: consider more sophisticated algorithms. In particular, exact geometric + spacing often requires irregular intermediate chunk sizes, which (currently) + cannot be stored in Zarr arrays. """ - stage_chunks = [] - for stage in range(1, stage_count): - power = stage / stage_count - # Add a small floating-point epsilon so we don't inadvertently - # round-down even chunk-sizes. - chunks = tuple( - floor(rc ** (1 - power) * wc**power + epsilon) - for rc, wc in zip(read_chunks, write_chunks) - ) - stage_chunks.append(chunks) - return stage_chunks + approx_stages = np.geomspace(read_chunks, write_chunks, num=stage_count + 1) + return [tuple(floor(c) for c in stage) for stage in approx_stages[1:-1]] + + +def _count_intermediate_chunks(source_chunk: int, target_chunk: int, size: int) -> int: + """Count intermediate chunks required for rechunking along a dimension. + + Intermediate chunks must divide both the source and target chunks, and in + general do not need to have a regular size. The number of intermediate + chunks is proportional to the number of required read/write operations. + + For example, suppose we want to rechunk an array of size 20 from size 5 + chunks to size 7 chunks. We can draw out how the array elements are divided: + 0 1 2 3 4|5 6 7 8 9|10 11 12 13 14|15 16 17 18 19 (4 chunks) + 0 1 2 3 4 5 6|7 8 9 10 11 12 13|14 15 16 17 18 19 (3 chunks) + + To transfer these chunks, we would need to divide the array into irregular + intermediate chunks that fit into both the source and target: + 0 1 2 3 4|5 6|7 8 9|10 11 12 13|14|15 16 17 18 19 (6 chunks) + This matches what ``_count_intermediate_chunks()`` calculates:: -def _count_num_splits(source_chunk: int, target_chunk: int, size: int) -> int: + >>> _count_intermediate_chunks(5, 7, 20) + 6 + """ multiple = lcm(source_chunk, target_chunk) splits_per_lcm = multiple // source_chunk + multiple // target_chunk - 1 lcm_count, remainder = divmod(size, multiple) @@ -167,8 +180,8 @@ def _count_num_splits(source_chunk: int, target_chunk: int, size: int) -> int: def calculate_single_stage_io_ops( shape: Sequence[int], in_chunks: Sequence[int], out_chunks: Sequence[int] ) -> int: - """Estimate the number of irregular chunks required for rechunking.""" - return prod(map(_count_num_splits, in_chunks, out_chunks, shape)) + """Count the number of read/write operations required for rechunking.""" + return prod(map(_count_intermediate_chunks, in_chunks, out_chunks, shape)) # not a tight upper bound, but ensures that the loop in @@ -193,7 +206,12 @@ def multistage_rechunking_plan( consolidate_reads: bool = True, consolidate_writes: bool = True, ) -> _MultistagePlan: - """A rechunking plan that can use multiple split/consolidate steps.""" + """Caculate a rechunking plan that can use multiple split/consolidate steps. + + For best results, max_mem should be significantly larger than min_mem (e.g., + 10x). Otherwise an excessive number of rechunking steps will be required. + """ + ndim = len(shape) if len(source_chunks) != ndim: raise ValueError(f"source_chunks {source_chunks} must have length {ndim}") @@ -212,9 +230,9 @@ def multistage_rechunking_plan( f"Target chunk memory ({target_chunk_mem}) exceeds max_mem ({max_mem})" ) - if max_mem < min_mem: # basic sanity test + if max_mem < min_mem: # basic sanity check raise ValueError( - "max_mem ({max_mem}) cannot be smaller than min_mem ({min_mem})" + f"max_mem ({max_mem}) cannot be smaller than min_mem ({min_mem})" ) if consolidate_writes: @@ -321,7 +339,7 @@ def rechunking_plan( Target chunk shape (must be in form (5, 10, 20), no irregular chunks) itemsize: int Number of bytes used to represent a single array element - max_mem : Int + max_mem : int Maximum permissible chunk memory size, measured in units of itemsize consolidate_reads: bool, optional Whether to apply read chunk consolidation diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 8b49515..0eca83f 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -125,16 +125,19 @@ def test_calculate_stage_chunks(read_chunks, write_chunks, stage_count, expected @pytest.mark.parametrize( "shape, in_chunks, out_chunks, expected", [ + # simple 1d cases ((6,), (1,), (6,), 6), ((10,), (1,), (6,), 10), ((6,), (2,), (3,), 4), ((24,), (2,), (3,), 16), ((10,), (4,), (5,), 4), ((100,), (4,), (5,), 40), + # simple 2d cases ((100, 100), (1, 100), (100, 1), 10_000), ((100, 100), (1, 10), (10, 1), 10_000), - ((100, 100), (20, 20), (25, 25), (5 + 3) ** 2), - ((50, 50), (20, 20), (25, 25), ((5 + 3) // 2) ** 2), + ((100, 100), (20, 20), (25, 25), 8**2), + ((50, 50), (20, 20), (25, 25), 4**2), + # edge cases where one chunk size is 43 (a prime) ((100,), (43,), (100,), 3), ((100,), (43,), (51,), 4), ((100,), (43,), (40,), 5), From 847766f588a64f81b817703d97c27a3a21b76960 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Nov 2022 00:43:07 +0000 Subject: [PATCH 12/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- rechunker/algorithm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 373014b..095b902 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -5,6 +5,7 @@ from typing import List, Optional, Sequence, Tuple import numpy as np + from rechunker.compat import lcm logger = logging.getLogger(__name__)