Skip to content

Commit

Permalink
API: provides method to compute unknown array chunks sizes (dask#5312)
Browse files Browse the repository at this point in the history
  • Loading branch information
stsievert authored and TomAugspurger committed Sep 7, 2019
1 parent 6e7bb0a commit 0ba2223
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 22 deletions.
91 changes: 83 additions & 8 deletions dask/array/core.py
Expand Up @@ -76,6 +76,15 @@
tensordot_lookup.register((object, np.ndarray), np.tensordot)
einsum_lookup.register((object, np.ndarray), np.einsum)

unknown_chunk_message = (
"\n\n"
"A possible solution: "
"https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks\n"
"Summary: to compute chunks sizes, use\n\n"
" x.compute_chunk_sizes() # for Dask Array `x`\n"
" ddf.to_dask_array(lengths=True) # for Dask DataFrame `ddf`"
)


class PerformanceWarning(Warning):
""" A warning given when bad chunking may cause poor performance """
Expand Down Expand Up @@ -933,7 +942,8 @@ def blockdims_from_blockshape(shape, chunks):
raise TypeError("Must supply shape= keyword argument")
if np.isnan(sum(shape)) or np.isnan(sum(chunks)):
raise ValueError(
"Array chunk sizes are unknown. shape: %s, chunks: %s" % (shape, chunks)
"Array chunk sizes are unknown. shape: %s, chunks: %s%s"
% (shape, chunks, unknown_chunk_message)
)
if not all(map(is_integer, chunks)):
raise ValueError("chunks can only contain integers.")
Expand Down Expand Up @@ -1080,6 +1090,51 @@ def numblocks(self):
def npartitions(self):
return reduce(mul, self.numblocks, 1)

def compute_chunk_sizes(self):
"""
Compute the chunk sizes for a Dask array. This is especially useful
when the chunk sizes are unknown (e.g., when indexing one Dask array
with another).
Notes
-----
This function modifies the Dask array in-place.
Examples
--------
>>> import dask.array as da
>>> import numpy as np
>>> x = da.from_array([-2, -1, 0, 1, 2], chunks=2)
>>> x.chunks
((2, 2, 1),)
>>> y = x[x <= 0]
>>> y.chunks
((nan, nan, nan),)
>>> y.compute_chunk_sizes() # in-place computation
dask.array<getitem, shape=(3,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>
>>> y.chunks
((2, 1, 0),)
"""
x = self
chunk_shapes = x.map_blocks(
_get_chunk_shape,
dtype=int,
chunks=tuple(len(c) * (1,) for c in x.chunks) + ((x.ndim,),),
new_axis=x.ndim,
)

c = []
for i in range(x.ndim):
s = x.ndim * [0] + [i]
s[i] = slice(None)
s = tuple(s)

c.append(tuple(chunk_shapes[s]))

x._chunks = compute(tuple(c))[0]
return x

@property
def shape(self):
return tuple(map(sum, self.chunks))
Expand All @@ -1096,11 +1151,14 @@ def _get_chunks(self):
return self._chunks

def _set_chunks(self, chunks):
raise TypeError(
msg = (
"Can not set chunks directly\n\n"
"Please use the rechunk method instead:\n"
" x.rechunk(%s)" % str(chunks)
" x.rechunk({})\n\n"
"If trying to avoid unknown chunks, use\n"
" x.compute_chunk_sizes()"
)
raise TypeError(msg.format(chunks))

chunks = property(_get_chunks, _set_chunks, "chunks property")

Expand Down Expand Up @@ -2473,7 +2531,8 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None):
and np.isnan(x).any()
):
raise ValueError(
"Can not perform automatic rechunking with unknown (nan) chunk sizes"
"Can not perform automatic rechunking with unknown "
"(nan) chunk sizes.%s" % unknown_chunk_message
)

limit = max(1, limit)
Expand Down Expand Up @@ -2564,6 +2623,11 @@ def round_to(c, s):
return c // s * s


def _get_chunk_shape(a):
s = np.asarray(a.shape, dtype=int)
return s[len(s) * (None,) + (slice(None),)]


def from_array(
x,
chunks="auto",
Expand Down Expand Up @@ -2787,13 +2851,17 @@ def to_zarr(
ValueError
If ``arr`` has unknown chunk sizes, which is not supported by Zarr.
See Also
--------
dask.array.Array.compute_chunk_sizes
"""
import zarr

if np.isnan(arr.shape).any():
raise ValueError(
"Saving a dask array with unknown chunk sizes is not "
"currently supported by Zarr"
"currently supported by Zarr.%s" % unknown_chunk_message
)

if isinstance(url, zarr.Array):
Expand Down Expand Up @@ -2969,7 +3037,11 @@ def common_blockdim(blockdims):
return max(blockdims, key=first)

if np.isnan(sum(map(sum, blockdims))):
raise ValueError("Arrays chunk sizes are unknown: %s", blockdims)
raise ValueError(
"Arrays chunk sizes (%s) are unknown.\n\n"
"A possible solution:\n"
" x.compute_chunk_sizes()" % blockdims
)

if len(set(map(sum, non_trivial_dims))) > 1:
raise ValueError("Chunks do not add up to same value", blockdims)
Expand Down Expand Up @@ -3365,8 +3437,11 @@ def concatenate(seq, axis=0, allow_unknown_chunksizes=False):
if any(map(np.isnan, seq2[0].shape)):
raise ValueError(
"Tried to concatenate arrays with unknown"
" shape %s. To force concatenation pass"
" allow_unknown_chunksizes=True." % str(seq2[0].shape)
" shape %s.\n\nTwo solutions:\n"
" 1. Force concatenation pass"
" allow_unknown_chunksizes=True.\n"
" 2. Compute shapes with "
"[x.compute_chunk_sizes() for x in seq]" % str(seq2[0].shape)
)
raise ValueError("Shapes do not align: %s", [x.shape for x in seq2])

Expand Down
5 changes: 4 additions & 1 deletion dask/array/rechunk.py
Expand Up @@ -142,7 +142,10 @@ def _old_to_new(old_chunks, new_chunks):
if not sums == sums2:
raise ValueError("Cannot change dimensions from %r to %r" % (sums, sums2))
if not n_missing == n_missing2:
raise ValueError("Chunks must be unchanging along unknown dimensions")
raise ValueError(
"Chunks must be unchanging along unknown dimensions.\n\n"
"A possible solution:\n x.compute_chunk_sizes()"
)

old_to_new = [_intersect_1d(_breakpoints(cm[0], cm[1])) for cm in zip(cmo, cmn)]
for idx, missing in enumerate(n_missing):
Expand Down
6 changes: 4 additions & 2 deletions dask/array/reductions.py
Expand Up @@ -953,8 +953,10 @@ def arg_reduction(x, chunk, combine, agg, axis=None, split_every=None, out=None)
if len(chunks) > 1 and np.isnan(chunks).any():
raise ValueError(
"Arg-reductions do not work with arrays that have "
"unknown chunksizes. At some point in your computation "
"this array lost chunking information"
"unknown chunksizes. At some point in your computation "
"this array lost chunking information.\n\n"
"A possible solution is with \n"
" x.compute_chunk_sizes()"
)

# Map chunk across all blocks
Expand Down
5 changes: 4 additions & 1 deletion dask/array/reshape.py
Expand Up @@ -171,7 +171,10 @@ def reshape(x, shape):
shape = tuple(missing_size if s == -1 else s for s in shape)

if np.isnan(sum(x.shape)):
raise ValueError("Array chunk size or shape is unknown. shape: %s", x.shape)
raise ValueError(
"Array chunk size or shape is unknown. shape: %s\n\n"
"Possible solution with x.compute_chunk_sizes()" % x.shape
)

if reduce(mul, shape, 1) != x.size:
raise ValueError("total size of new array must be unchanged")
Expand Down
6 changes: 5 additions & 1 deletion dask/array/slicing.py
Expand Up @@ -277,11 +277,15 @@ def slice_slices_and_integers(out_name, in_name, blockdims, index):
_slice_1d
"""
from .core import unknown_chunk_message

shape = tuple(cached_cumsum(dim, initial_zero=True)[-1] for dim in blockdims)

for dim, ind in zip(shape, index):
if np.isnan(dim) and ind != slice(None, None, None):
raise ValueError("Arrays chunk sizes are unknown: %s", shape)
raise ValueError(
"Arrays chunk sizes are unknown: %s%s" % (shape, unknown_chunk_message)
)

assert all(isinstance(ind, (slice, Integral)) for ind in index)
assert len(index) == len(blockdims)
Expand Down
5 changes: 4 additions & 1 deletion dask/array/svg.py
Expand Up @@ -21,7 +21,10 @@ def svg(chunks, size=200, **kwargs):
"""
shape = tuple(map(sum, chunks))
if np.isnan(shape).any(): # don't support unknown sizes
raise NotImplementedError("Can't generate SVG with unknown chunk sizes")
raise NotImplementedError(
"Can't generate SVG with unknown chunk sizes.\n\n"
" A possible solution is with x.compute_chunk_sizes()"
)
if not all(shape):
raise NotImplementedError("Can't generate SVG with 0-length dimensions")
if len(chunks) == 0:
Expand Down
129 changes: 129 additions & 0 deletions dask/array/tests/test_array_core.py
Expand Up @@ -8,6 +8,7 @@
import os
import sys
import time
from io import StringIO
from distutils.version import LooseVersion
import operator
from operator import add, sub, getitem
Expand All @@ -19,6 +20,7 @@

import dask
import dask.array as da
import dask.dataframe
from dask.base import tokenize, compute_as_if_collection
from dask.delayed import Delayed, delayed
from dask.utils import ignoring, tmpfile, tmpdir, key_split
Expand Down Expand Up @@ -4102,3 +4104,130 @@ def test_from_array_meta():
meta = sparse.COO.from_numpy(x)
y = da.from_array(x, meta=meta)
assert isinstance(y._meta, sparse.COO)


def test_compute_chunk_sizes():
x = da.from_array(np.linspace(-1, 1, num=50), chunks=10)
y = x[x < 0]
assert np.isnan(y.shape[0])
assert y.chunks == ((np.nan,) * 5,)

z = y.compute_chunk_sizes()
assert y is z
assert z.chunks == ((10, 10, 5, 0, 0),)
assert len(z) == 25


def test_compute_chunk_sizes_2d_array():
X = np.linspace(-1, 1, num=9 * 4).reshape(9, 4)
X = da.from_array(X, chunks=(3, 4))
idx = X.sum(axis=1) > 0
Y = X[idx]

# This is very similar to the DataFrame->Array conversion
assert np.isnan(Y.shape[0]) and Y.shape[1] == 4
assert Y.chunks == ((np.nan, np.nan, np.nan), (4,))

Z = Y.compute_chunk_sizes()
assert Y is Z
assert Z.chunks == ((0, 1, 3), (4,))
assert Z.shape == (4, 4)


def test_compute_chunk_sizes_3d_array(N=8):
X = np.linspace(-1, 2, num=8 * 8 * 8).reshape(8, 8, 8)
X = da.from_array(X, chunks=(4, 4, 4))
idx = X.sum(axis=0).sum(axis=0) > 0
Y = X[idx]
idx = X.sum(axis=1).sum(axis=1) < 0
Y = Y[:, idx]
idx = X.sum(axis=2).sum(axis=1) > 0.1
Y = Y[:, :, idx]

# Checking to make sure shapes are different on outputs
assert Y.compute().shape == (8, 3, 5)
assert X.compute().shape == (8, 8, 8)

assert Y.chunks == ((np.nan, np.nan),) * 3
assert all(np.isnan(s) for s in Y.shape)
Z = Y.compute_chunk_sizes()
assert Z is Y
assert Z.shape == (8, 3, 5)
assert Z.chunks == ((4, 4), (3, 0), (1, 4))


def _known(num=50):
return da.from_array(np.linspace(-1, 1, num=num), chunks=10)


@pytest.fixture()
def unknown():
x = _known()
y = x[x < 0]
assert y.chunks == ((np.nan,) * 5,)
return y


def test_compute_chunk_sizes_warning_fixes_rechunk(unknown):
y = unknown
with pytest.raises(ValueError, match="compute_chunk_sizes"):
y.rechunk("auto")
y.compute_chunk_sizes()
y.rechunk("auto")


def test_compute_chunk_sizes_warning_fixes_to_zarr(unknown):
y = unknown
with pytest.raises(ValueError, match="compute_chunk_sizes"):
with StringIO() as f:
y.to_zarr(f)
y.compute_chunk_sizes()

with pytest.raises(ValueError, match="irregular chunking"):
with StringIO() as f:
y.to_zarr(f)


def test_compute_chunk_sizes_warning_fixes_to_svg(unknown):
y = unknown
with pytest.raises(NotImplementedError, match="compute_chunk_sizes"):
y.to_svg()
y.compute_chunk_sizes()
y.to_svg()


def test_compute_chunk_sizes_warning_fixes_concatenate():
x = _known(num=100).reshape(10, 10)
idx = x.sum(axis=0) > 0
y1 = x[idx]
y2 = x[idx]
with pytest.raises(ValueError, match="compute_chunk_sizes"):
da.concatenate((y1, y2), axis=1)
y1.compute_chunk_sizes()
y2.compute_chunk_sizes()
da.concatenate((y1, y2), axis=1)


def test_compute_chunk_sizes_warning_fixes_reduction(unknown):
y = unknown
with pytest.raises(ValueError, match="compute_chunk_sizes"):
da.argmin(y)
y.compute_chunk_sizes()
da.argmin(y)


def test_compute_chunk_sizes_warning_fixes_reshape(unknown):
y = unknown
with pytest.raises(ValueError, match="compute_chunk_sizes"):
da.reshape(y, (5, 5))
y.compute_chunk_sizes()
da.reshape(y, (5, 5))


def test_compute_chunk_sizes_warning_fixes_slicing():
x = _known(num=100).reshape(10, 10)
y = x[x.sum(axis=0) < 0]
with pytest.raises(ValueError, match="compute_chunk_sizes"):
y[:3, :]
y.compute_chunk_sizes()
y[:3, :]

0 comments on commit 0ba2223

Please sign in to comment.