Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Dynamic rechunking option for StoreToZarr #546

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6346d41
first implementation not working
jbusecke Jul 15, 2023
089dc7a
Save progress from the hack
jbusecke Jul 15, 2023
5fadc03
New even divisor algo + passing tests
jbusecke Jul 16, 2023
d0f3bcb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2023
1903ca7
Remove commented out old version
jbusecke Jul 16, 2023
62d46ad
merge commit
jbusecke Jul 16, 2023
243c5c3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2023
4ba4066
Bugfix now scales with ds size
jbusecke Jul 18, 2023
9375136
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2023
6b89214
rename difference to similarity
jbusecke Jul 18, 2023
f6e86d6
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 18, 2023
b2c48bf
implemented + tested dask.utils.parse_bytes + docstring
jbusecke Jul 18, 2023
b99c4ef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2023
bc980f7
Implemented review + renaming + test pass locally
jbusecke Jul 25, 2023
29097e8
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 25, 2023
f3aeaa5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 25, 2023
c913d3b
nightly commit
jbusecke Jul 25, 2023
9a845a1
Added dynamic chunking and checks to StoreToZarr
jbusecke Jul 26, 2023
39ff7e6
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 26, 2023
f608686
some cleaning up + docstring adds
jbusecke Jul 26, 2023
e3d61eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 26, 2023
d0fce99
Fix flake8 for new tests
jbusecke Jul 26, 2023
ee45258
Restored default behavior if no chunking input is given
jbusecke Jul 26, 2023
e97c492
fixed mypy issues
jbusecke Jul 26, 2023
f5bcea8
Add mypy cast comment
jbusecke Jul 26, 2023
427dbe7
Add final test
jbusecke Jul 26, 2023
91eb6a6
Update dynamic_target_chunks.py
jbusecke Jul 28, 2023
a5a182a
assign target chunks as singelton
jbusecke Jul 29, 2023
707a9ca
simplify AsSingleton call
jbusecke Aug 2, 2023
4ad2a69
Go back to the old logic, but fix typo
jbusecke Aug 2, 2023
8df16ba
Print dynamically determined chunks
jbusecke Aug 3, 2023
11de660
Update transforms.py
jbusecke Aug 5, 2023
b8ead76
implement logic for extra and missing dims + tests
jbusecke Aug 5, 2023
eecd8ed
Add default ratio and allow extra dims input on storetozarr
jbusecke Aug 6, 2023
c28ac56
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Aug 6, 2023
7347c1b
Fill docstring of StoreToZarr
jbusecke Aug 6, 2023
96c57e8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
edb4099
Update transforms.py
jbusecke Aug 6, 2023
377133b
Rework docstring for dynamic_target_chunks_from_schema
jbusecke Aug 6, 2023
575bec2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
9334e61
Update pangeo_forge_recipes/dynamic_target_chunks.py
jbusecke Aug 6, 2023
5062318
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
d0f2cf6
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
44aceaa
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
ca680c3
Update test_transforms.py
jbusecke Aug 6, 2023
8d32404
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
53b0ae9
Fix tests
jbusecke Aug 6, 2023
3de3168
fix store to zarr call to _get_target_chunks
jbusecke Aug 6, 2023
db0551f
fix flake 8 issues
jbusecke Aug 6, 2023
27bfd36
fix matched error message in test
jbusecke Aug 6, 2023
3ca76df
Fix another test
jbusecke Aug 6, 2023
4838c5f
Add logging step printing the schema
jbusecke Aug 18, 2023
59432fd
Merge branch 'main' into dynamic_chunks_2
jbusecke Aug 18, 2023
d9115f7
Restor cast import
jbusecke Aug 18, 2023
ec8ab95
Try to force test in-line after finished store
jbusecke Aug 18, 2023
25823e6
more tinkering with store return
jbusecke Aug 18, 2023
ad79726
Reverting changes to target_store return
jbusecke Aug 18, 2023
2cc6700
Added fallback algorithm for non-even divisions
jbusecke Aug 22, 2023
db6767b
Attempt to add logging to dyn chunk logic
jbusecke Aug 22, 2023
fa05c9c
Increase range of scaling factors
jbusecke Aug 22, 2023
082979b
More tests + slight algo refactor
jbusecke Aug 22, 2023
e40eb55
Merge remote-tracking branch 'upstream/main' into dynamic_chunks_2
jbusecke Aug 23, 2023
774fff8
Merge remote-tracking branch 'upstream/main' into dynamic_chunks_2
jbusecke Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
141 changes: 141 additions & 0 deletions pangeo_forge_recipes/dynamic_target_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import itertools
from typing import Dict, List, Union

import numpy as np
import xarray as xr
from dask.utils import parse_bytes

from pangeo_forge_recipes.aggregation import XarraySchema, schema_to_template_ds


def get_memory_size(ds: xr.Dataset, chunks: Dict[str, int]) -> int:
"""Returns an estimate of memory size based on input chunks.
Currently this applies the chunks input to the dataset, then
iterates through the variables and returns the maximum.
"""
ds_single_chunk = ds.isel({dim: slice(0, chunk) for dim, chunk in chunks.items()})
mem_size = max([ds_single_chunk[var].nbytes for var in ds_single_chunk.data_vars])
return mem_size


def similarity(a: np.ndarray, b: np.ndarray) -> np.ndarray:
return np.sqrt(np.sum((a - b) ** 2))


def normalize(a: np.ndarray) -> np.ndarray:
"""Convert to a unit vector"""
return a / np.sqrt(np.sum(a**2))


def even_divisor_chunks(n: int) -> List[int]:
"""Returns values that evenly divide n"""
divisors = []
for i in range(1, n + 1):
if n % i == 0:
divisors.append(n // i)
return divisors


def _maybe_parse_bytes(target_chunk_size: Union[str, int]) -> int:
if isinstance(target_chunk_size, str):
return parse_bytes(target_chunk_size)
else:
return target_chunk_size


def dynamic_target_chunks_from_schema(
schema: XarraySchema,
target_chunk_size: Union[int, str],
target_chunks_aspect_ratio: Dict[str, int],
size_tolerance: float,
) -> dict[str, int]:
"""Determine chunksizes based on desired chunksize (max size of any variable in the
dataset) and the ratio of total chunks along each dimension of the dataset. The
algorithm finds even divisors, and chooses possible combination that produce chunk
sizes close to the target. From this set of combination the chunks that most closely
produce the aspect ratio of total chunks along the given dimensions is returned.
jbusecke marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
schema : XarraySchema
Schema of the input dataset
target_chunk_size : Union[int, str]
Desired chunk size (defined as the max size of any variable in the dataset with
chosen chunks). Can be provided as integer (bytes) or a string like '100MB'.
size_tolerance : float
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description of target_chunks_aspect_ratio?

Chunksize tolerance. Resulting chunk size will be within
[target_chunk_size*(1-size_tolerance),
target_chunk_size*(1+size_tolerance)]

Returns
-------
dict[str, int]
Target chunk dictionary. Can be passed directly to `ds.chunk()`
"""
target_chunk_size = _maybe_parse_bytes(target_chunk_size)

ds = schema_to_template_ds(schema)

if set(target_chunks_aspect_ratio.keys()) != set(ds.dims):
raise ValueError(
f"target_chunks_aspect_ratio must contain all dimensions in dataset. "
f"Got {target_chunks_aspect_ratio.keys()} but expected {list(ds.dims.keys())}"
)

dims, shape = zip(*ds.dims.items())
ratio = [target_chunks_aspect_ratio[dim] for dim in dims]

possible_chunks = []
for s, r, dim in zip(shape, ratio, dims):
if r > 0:
# Get a list of all the even divisors
possible_chunks.append(even_divisor_chunks(s))
elif r == -1:
# Always keep this dimension unchunked
possible_chunks.append([s])
else:
raise ValueError(
f"Ratio value can only be larger than 0 or -1. Got {r} for dimension {dim}"
)

combinations = [p for p in itertools.product(*possible_chunks)]
# Check the size of each combination on the dataset
combination_sizes = [
get_memory_size(ds, {dim: chunk for dim, chunk in zip(dims, c)}) for c in combinations
]

# And select a subset with some form of tolerance based on the size requirement
tolerance = size_tolerance * target_chunk_size
combinations_filtered = [
c for c, s in zip(combinations, combination_sizes) if abs(s - target_chunk_size) < tolerance
]

# If there are no matches in the range, the user has to increase the tolerance for this to work.
if len(combinations_filtered) == 0:
raise ValueError(
(
"Could not find any chunk combinations satisfying "
"the size constraint. Consider increasing tolerance"
)
)

# Now that we have cominations in the memory size range we want, we can check which is
# closest to our desired chunk ratio.
# We can think of this as comparing the angle of two vectors.
# To compare them we need to normalize (we dont care about the amplitude here)

# convert each combination into an array of resulting chunks per dimension, then normalize
ratio_combinations = [normalize(np.array(shape) / np.array(c)) for c in combinations_filtered]

# Find the 'closest' fit between normalized ratios
# cartesian difference between vectors ok?
ratio_normalized = normalize(np.array(ratio))
ratio_similarity = [similarity(ratio_normalized, r) for r in ratio_combinations]

# sort by the mostl similar (smallest value of ratio_similarity)
combinations_sorted = [c for _, c in sorted(zip(ratio_similarity, combinations_filtered))]

# Return the chunk combination with the closest fit
optimal_combination = combinations_sorted[0]

return {dim: chunk for dim, chunk in zip(dims, optimal_combination)}
80 changes: 73 additions & 7 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from dataclasses import dataclass, field

# from functools import wraps
from typing import Dict, List, Optional, Tuple, TypeVar
from typing import Dict, List, Optional, Tuple, TypeVar, Union, cast

import apache_beam as beam

from .aggregation import XarraySchema, dataset_to_schema, schema_to_zarr
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
from .dynamic_target_chunks import dynamic_target_chunks_from_schema
from .openers import open_url, open_with_kerchunk, open_with_xarray
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
from .rechunking import combine_fragments, split_fragment
Expand Down Expand Up @@ -292,7 +293,6 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:

@dataclass
class StoreDatasetFragments(beam.PTransform):

target_store: beam.PCollection # side input

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
Expand Down Expand Up @@ -383,20 +383,86 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
:param combine_dims: The dimensions to combine
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the data.
:param target_chunk_aspect_ratio: Dictionary mapping dimension names to desired
aspect ratio of total number of chunks along each dimension.
:param target_chunk_size: Desired single chunks size. Can be provided as
integer (bytes) or as a str like '100MB' etc.
:param size_tolerance : float, optional
Chunksize tolerance. Resulting chunk size will be within
[target_chunk_size*(1-size_tolerance),
target_chunk_size*(1+size_tolerance)] , by default 0.2
"""

# TODO: make it so we don't have to explicitly specify combine_dims
# Could be inferred from the pattern instead
combine_dims: List[Dimension]
target_chunks: Dict[str, int] = field(default_factory=dict)
target_chunks: Optional[Dict[str, int]] = field(default=None)
target_chunks_aspect_ratio: Optional[Dict[str, int]] = field(default=None)
target_chunk_size: Optional[Union[str, int]] = field(
default=None
) # ? Should we provide a default?
size_tolerance: float = 0.2

def __post_init__(self):
jbusecke marked this conversation as resolved.
Show resolved Hide resolved
# if none of the chunking parameters is specified, set the default behavior
# of target_chunks={}
if all(
a is None
for a in (self.target_chunks, self.target_chunk_size, self.target_chunks_aspect_ratio)
):
self.target_chunks = {}

# check that not both static and dynamic chunking are specified
if self.target_chunks is not None and (
self.target_chunk_size is not None or self.target_chunks_aspect_ratio is not None
):
raise ValueError(
(
"Cannot specify both target_chunks and "
"target_chunk_size or target_chunks_aspect_ratio."
)
)
jbusecke marked this conversation as resolved.
Show resolved Hide resolved

# if dynamic chunking is specified, make sure both target_chunk_size
# and target_chunks_aspect_ratio are specified
if self.target_chunks is None and not (
self.target_chunk_size is not None and self.target_chunks_aspect_ratio is not None
):
raise ValueError(
(
"Must specify both target_chunk_size and "
"target_chunks_aspect_ratio to enable dynamic chunking."
)
)
jbusecke marked this conversation as resolved.
Show resolved Hide resolved

def determine_target_chunks(self, schema: XarraySchema) -> Dict[str, int]:
jbusecke marked this conversation as resolved.
Show resolved Hide resolved
# if dynamic chunking is chosen, set the objects target_chunks here
# We need to explicitly cast the types here because we know
# that for the combinations chosen here (dynamic vs static chunking)
# the type of the input is never None (our __post_init__ method takes
# care of raising errors if this is the case).
if self.target_chunks_aspect_ratio is not None:
target_chunks = dynamic_target_chunks_from_schema(
schema,
target_chunk_size=cast(Union[int, str], self.target_chunk_size),
target_chunks_aspect_ratio=cast(Dict[str, int], self.target_chunks_aspect_ratio),
size_tolerance=self.size_tolerance,
)
else:
target_chunks = cast(Dict[str, int], self.target_chunks)
return target_chunks

def expand(self, datasets: beam.PCollection) -> beam.PCollection:
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
rechunked_datasets = indexed_datasets | Rechunk(
target_chunks=self.target_chunks, schema=schema
target_chunks = beam.pvalue.AsSingelton(
schema
| beam.Map(
self.determine_target_chunks
) # Would beam.pvalue.AsSingleton(self.determine_target_chunks(schema)) work?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or

target_chunks = self.determine_target_chunks(beam.pvalue.AsSingleton(schema))

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, that makes a lot more sense. Changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait getting this over at the CMIP6 recipes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now settled on target_chunks = beam.pvalue.AsSingleton(schema | beam.Map(self._get_target_chunks)) which seems to work, while just applying AsSingelton to the schema did not.

)
indexed_datasets = datasets | IndexItems(schema=schema)
rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema)
target_store = schema | PrepareZarrTarget(
target=self.get_full_target(), target_chunks=self.target_chunks
target=self.get_full_target(), target_chunks=target_chunks
)
return rechunked_datasets | StoreDatasetFragments(target_store=target_store)
121 changes: 121 additions & 0 deletions tests/test_dynamic_target_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from typing import Dict

import dask.array as dsa
import pytest
import xarray as xr

from pangeo_forge_recipes.aggregation import dataset_to_schema
from pangeo_forge_recipes.dynamic_target_chunks import dynamic_target_chunks_from_schema


def _create_ds(dims_shape: Dict[str, int]) -> xr.Dataset:
return xr.DataArray(
dsa.random.random(list(dims_shape.values())),
dims=list(dims_shape.keys()),
).to_dataset(name="data")


class TestDynamicTargetChunks:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough tests.

Question: is there a reason you chose to use pytest's test class approach here? I'm not categorically opposed to it, but AFAICT the tests added by this PR would be the only class-based tests in pangeo-forge-recipes, so for stylistic consistency maybe better to write them in the function-based style, unless there's a specific reason not to?

@pytest.mark.parametrize(
("dims_shape", "target_chunks_aspect_ratio", "expected_target_chunks"),
[
# make sure that for the same dataset we get smaller chunksize along
# a dimension if the ratio is larger
(
{"x": 200, "y": 200, "z": 200},
{"x": 1, "y": 1, "z": 10},
{"x": 50, "y": 100, "z": 25},
),
(
{"x": 200, "y": 200, "z": 200},
{"x": 10, "y": 1, "z": 1},
{"x": 25, "y": 50, "z": 100},
),
# test the special case where we want to just chunk along a single dimension
(
{"x": 100, "y": 300, "z": 400},
{"x": -1, "y": -1, "z": 1},
{"x": 100, "y": 300, "z": 4},
),
],
)
def test_dynamic_rechunking(
self, dims_shape, target_chunks_aspect_ratio, expected_target_chunks
):
ds = _create_ds(dims_shape)
schema = dataset_to_schema(ds)
target_chunks = dynamic_target_chunks_from_schema(
schema, 1e6, target_chunks_aspect_ratio=target_chunks_aspect_ratio, size_tolerance=0.2
)
print(target_chunks)
for dim, chunks in expected_target_chunks.items():
assert target_chunks[dim] == chunks

def test_nbytes_str_input(self):
ds = _create_ds({"x": 100, "y": 100, "z": 100})
schema = dataset_to_schema(ds)
target_chunks_aspect_ratio = {"x": 1, "y": 1, "z": 1}
target_chunks_int = dynamic_target_chunks_from_schema(
schema, 1e6, target_chunks_aspect_ratio=target_chunks_aspect_ratio, size_tolerance=0.2
)
target_chunks_str = dynamic_target_chunks_from_schema(
schema, "1MB", target_chunks_aspect_ratio=target_chunks_aspect_ratio, size_tolerance=0.2
)
for dim in target_chunks_aspect_ratio.keys():
assert target_chunks_int[dim] == target_chunks_str[dim]

def test_dynamic_rechunking_maintain_ratio(self):
"""Confirm that for a given ratio with two differently sized datasets we
maintain a constant ratio between total number of chunks"""
ds_equal = _create_ds({"x": 64, "y": 64})
ds_long = _create_ds({"x": 64, "y": 256})

for ds in [ds_equal, ds_long]:
print(ds)
schema = dataset_to_schema(ds)
target_chunks = dynamic_target_chunks_from_schema(
schema, 1e4, target_chunks_aspect_ratio={"x": 1, "y": 4}, size_tolerance=0.2
)
ds_rechunked = ds.chunk(target_chunks)
assert len(ds_rechunked.chunks["y"]) / len(ds_rechunked.chunks["x"]) == 4

@pytest.mark.parametrize(
"target_chunks_aspect_ratio", [{"x": 1, "y": -1, "z": 10}, {"x": 6, "y": -1, "z": 2}]
) # always keep y unchunked, and vary the others
@pytest.mark.parametrize("target_chunk_nbytes", [1e6, 1e7])
def test_dynamic_skip_dimension(self, target_chunks_aspect_ratio, target_chunk_nbytes):
ds = _create_ds({"x": 100, "y": 200, "z": 300})
# Mark dimension as 'not-to-chunk' with -1
schema = dataset_to_schema(ds)
target_chunks = dynamic_target_chunks_from_schema(
schema,
target_chunk_nbytes,
target_chunks_aspect_ratio=target_chunks_aspect_ratio,
size_tolerance=0.2,
)
assert target_chunks["y"] == len(ds["y"])

def test_dynamic_rechunking_error_dimension_missing(self):
# make sure that an error is raised if some dimension is not specified
ds = _create_ds({"x": 100, "y": 200, "z": 300})
schema = dataset_to_schema(ds)

with pytest.raises(
ValueError, match="target_chunks_aspect_ratio must contain all dimensions in dataset."
):
dynamic_target_chunks_from_schema(
schema, 1e6, target_chunks_aspect_ratio={"x": 1, "z": 10}, size_tolerance=0.2
)

def test_dynamic_rechunking_error_dimension_wrong(self):
ds = _create_ds({"x": 100, "y": 200, "z": 300})
schema = dataset_to_schema(ds)
with pytest.raises(
ValueError, match="target_chunks_aspect_ratio must contain all dimensions in dataset."
):
dynamic_target_chunks_from_schema(
schema,
1e6,
target_chunks_aspect_ratio={"x": 1, "y_wrong": 1, "z": 10},
size_tolerance=0.2,
)