Skip to content

Commit

Permalink
Refactor resample_blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed May 31, 2022
1 parent 5e64519 commit 87f1b7c
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions pyresample/resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,37 @@ def _create_cache_filename(self, cache_dir=None, prefix='',
return os.path.join(cache_dir, prefix + hash_str + fmt)


def resample_blocks(funk, src_area, src_arrays, dst_area,
def resample_blocks(func, src_area, src_arrays, dst_area,
dst_arrays=(), chunk_size=None, dtype=None, name=None, fill_value=None, **kwargs):
"""Resample dask arrays blockwise.
Resample_blocks applies a function blockwise to transform data from a source
area domain to a destination area domain.
Args:
funk: A callable to apply on the input data. This function is passed a block of src_arrays,
dst_array in that order, followed by the kwargs, which include the fill_value. If the function has a
block_info keyword argument, block information is passed to it that provides the source area, destination
area, position of source and destination blocks relative to respectively src_area and dst_area.
func: A callable to apply on the input data. This function is passed a block of src_arrays,
dst_arrays in that order, followed by the kwargs, which include the fill_value. If the callable accepts a
`block_info` keyword argument, block information is passed to it. Block information provides the source
area, destination area, position of source and destination blocks relative to respectively `src_area` and
`dst_area`.
src_area: a source geo definition.
dst_area: a destination geo definition. If the same as the source definition, a ValueError is raised.
funk: a function to use. If func has a block_info keyword argument, the chunk info is passed, as in map_blocks
src_arrays: data to use. When split into smaller bit to pass to funk, they are split across the x and y
src_arrays: data to use. When split into smaller bit to pass to func, they are split across the x and y
dimensions, but not across the other dimensions, so all the dimensions of the smaller arrays will be using
only one chunk!
dst_arrays: arrays to use that are already in dst_area space. If the array has more than 2 dimensions,
the last two are expected to be y, x.
chunk_size: the chunks size(s) to use in the dst_area space. This has to be provided since it is not guaranteed
that we can get this information from the other arguments. Moreover, this needs to be an iterable of k
elements if the resulting array of funk is to have a different number of dimensions than the input array.
elements if the resulting array of func is to have a different number of dimensions (k) than the input
array.
dtype: the dtype the resulting array is going to have. Has to be provided.
kwargs: any other keyword arguments that will be passed on to funk.
kwargs: any other keyword arguments that will be passed on to func.
Principle of operations:
Resample_blocks works by iterating over chunks on the dst_area domain. For each chunk, the corresponding slice
of the src_area domain is computed and the input src_arrays are cut accordingly to pass to funk. To know more
of the src_area domain is computed and the input src_arrays are cut accordingly to pass to func. To know more
about how the slicing is performed, refer to the :class:Slicer class and subclasses.
Examples:
Expand All @@ -201,7 +202,7 @@ def resample_blocks(funk, src_area, src_arrays, dst_area,
raise ValueError("Source and destination areas are identical."
" Should you be running `map_blocks` instead of `resample_blocks`?")

name = _create_dask_name(name, funk,
name = _create_dask_name(name, func,
src_area, src_arrays,
dst_area, dst_arrays,
fill_value, dtype, chunk_size, kwargs)
Expand All @@ -216,33 +217,36 @@ def resample_blocks(funk, src_area, src_arrays, dst_area,
position = dst_block_info["chunk-location"]
dst_block_info["shape"] = output_shape
try:
smaller_src_arrays, src_area_crop, src_block_info = crop_data_around_area(src_area, src_arrays,
dst_area_chunk)
_check_resolution_mismatch(src_area_crop, dtype)
cropped_src_arrays, cropped_src_area, src_block_info = crop_data_around_area(src_area, src_arrays,
dst_area_chunk)
_check_resolution_mismatch(cropped_src_area, dtype)
except IncompatibleAreas: # no relevant data matching
task = (np.full, dst_block_info["chunk-shape"], fill_value)
src_dependencies = []
else:
task = _create_task(funk, smaller_src_arrays, src_block_info, dst_arrays, dst_block_info, position,
fill_value, dependencies, kwargs)

task, src_dependencies = _create_task(func,
cropped_src_arrays, src_block_info,
dst_arrays, dst_block_info,
position,
fill_value, kwargs)
dask_graph[(name, *position)] = task
dependencies.extend(src_dependencies)

for dst_array in dst_arrays:
dependencies.append(dst_array)
dependencies.extend(dst_arrays)

dask_graph = HighLevelGraph.from_collections(name, dask_graph, dependencies=dependencies)
return da.Array(dask_graph, name, chunks=dst_chunks, dtype=dtype, shape=output_shape)


def _create_dask_name(name, funk, src_area, src_arrays, dst_area, dst_arrays, fill_value, dtype, chunks, kwargs):
def _create_dask_name(name, func, src_area, src_arrays, dst_area, dst_arrays, fill_value, dtype, chunks, kwargs):
if name is not None:
name = f"{name}"
else:
from dask.base import tokenize
from dask.utils import funcname
token = tokenize(funk, hash(src_area), *src_arrays, hash(dst_area), *dst_arrays,
token = tokenize(func, hash(src_area), *src_arrays, hash(dst_area), *dst_arrays,
fill_value, dtype, chunks, **kwargs)
name = f"{funcname(funk)}-{token}"
name = f"{funcname(func)}-{token}"
return name


Expand All @@ -265,25 +269,26 @@ def _check_resolution_mismatch(src_area_crop, dtype):
"resampler for the job.")


def _create_task(funk, smaller_src_arrays, src_block_info, dst_arrays, dst_block_info, position, fill_value,
dependencies, kwargs):
def _create_task(func, smaller_src_arrays, src_block_info, dst_arrays, dst_block_info, position, fill_value,
kwargs):
"""Create a task for resample_blocks."""
from dask.utils import has_keyword
dependencies = []
args = []
for smaller_data in smaller_src_arrays:
args.append((smaller_data.name, *([0] * smaller_data.ndim)))
dependencies.append(smaller_data)
for dst_array in dst_arrays:
dst_position = [0] * (dst_array.ndim - 2) + list(position[-2:])
args.append((dst_array.name, *dst_position))
funk_kwargs = kwargs.copy()
funk_kwargs['fill_value'] = fill_value
if has_keyword(funk, "block_info"):
funk_kwargs["block_info"] = {0: src_block_info,
func_kwargs = kwargs.copy()
func_kwargs['fill_value'] = fill_value
if has_keyword(func, "block_info"):
func_kwargs["block_info"] = {0: src_block_info,
None: dst_block_info}
pfunk = partial(funk, **funk_kwargs)
task = (pfunk, *args)
return task
pfunc = partial(func, **func_kwargs)
task = (pfunc, *args)
return task, dependencies


def crop_data_around_area(source_geo_def, src_arrays, target_geo_def):
Expand Down

0 comments on commit 87f1b7c

Please sign in to comment.