Skip to content

Commit

Permalink
Merge branch 'master' of github.com:dask/dask into bag-map-partitions…
Browse files Browse the repository at this point in the history
…-repeat
  • Loading branch information
mrocklin committed Feb 19, 2019
2 parents d33b128 + d6880be commit 1b79c42
Show file tree
Hide file tree
Showing 16 changed files with 268 additions and 43 deletions.
3 changes: 2 additions & 1 deletion dask/array/__init__.py
Expand Up @@ -55,7 +55,8 @@
from ..base import compute
from .optimization import optimize
from .creation import (arange, linspace, meshgrid, indices, diag, eye,
triu, tril, fromfunction, tile, repeat, pad)
triu, tril, fromfunction, tile, repeat, pad,
diagonal)
from .gufunc import apply_gufunc, gufunc, as_gufunc
from .utils import assert_eq

Expand Down
15 changes: 4 additions & 11 deletions dask/array/core.py
Expand Up @@ -2428,7 +2428,7 @@ def unify_chunks(*args, **kwargs):
if not args:
return {}, []

arginds = [(asarray(a) if ind is not None else a, ind)
arginds = [(asanyarray(a) if ind is not None else a, ind)
for a, ind in partition(2, args)] # [x, ij, y, jk]
args = list(concat(arginds)) # [(x, ij), (y, jk)]
warn = kwargs.get('warn', True)
Expand Down Expand Up @@ -2934,19 +2934,12 @@ def asarray(a, **kwargs):
>>> da.asarray(y)
dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3)>
"""
def frame_types():
try:
import dask.dataframe as dd
return (dd.Series, dd.DataFrame)
except ImportError:
return ()

if isinstance(a, Array):
return a
if isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
a = stack(a)
elif isinstance(a, frame_types()):
elif hasattr(a, 'to_dask_array'):
return a.to_dask_array()
elif isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
a = stack(a)
elif not isinstance(getattr(a, 'shape', None), Iterable):
a = np.asarray(a)
return from_array(a, chunks=a.shape, getitem=getter_inline, **kwargs)
Expand Down
72 changes: 70 additions & 2 deletions dask/array/creation.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import, division, print_function

from functools import partial, wraps
from functools import partial, wraps, reduce
from itertools import product
from operator import add
from operator import add, getitem
from numbers import Integral, Number

import numpy as np
Expand All @@ -16,6 +16,7 @@
stack, concatenate, block,
broadcast_to, broadcast_arrays)
from .wrap import empty, ones, zeros, full
from .utils import AxisError


def empty_like(a, dtype=None, chunks=None):
Expand Down Expand Up @@ -501,6 +502,73 @@ def diag(v):
return Array(graph, name, (chunks_1d, chunks_1d), dtype=v.dtype)


@wraps(np.diagonal)
def diagonal(a, offset=0, axis1=0, axis2=1):
name = 'diagonal-' + tokenize(a, offset, axis1, axis2)

if a.ndim < 2:
# NumPy uses `diag` as we do here.
raise ValueError("diag requires an array of at least two dimensions")

def _axis_fmt(axis, name, ndim):
if axis < 0:
t = ndim + axis
if t < 0:
msg = "{}: axis {} is out of bounds for array of dimension {}"
raise AxisError(msg.format(name, axis, ndim))
axis = t
return axis

axis1 = _axis_fmt(axis1, "axis1", a.ndim)
axis2 = _axis_fmt(axis2, "axis2", a.ndim)

if axis1 == axis2:
raise ValueError("axis1 and axis2 cannot be the same")

a = asarray(a)

if axis1 > axis2:
axis1, axis2 = axis2, axis1
offset = -offset

def _diag_len(dim1, dim2, offset):
return max(0, min(min(dim1, dim2), dim1 + offset, dim2 - offset))

diag_chunks = []
chunk_offsets = []
cum1 = [0] + list(np.cumsum(a.chunks[axis1]))[:-1]
cum2 = [0] + list(np.cumsum(a.chunks[axis2]))[:-1]
for co1, c1 in zip(cum1, a.chunks[axis1]):
chunk_offsets.append([])
for co2, c2 in zip(cum2, a.chunks[axis2]):
k = offset + co1 - co2
diag_chunks.append(_diag_len(c1, c2, k))
chunk_offsets[-1].append(k)

dsk = {}
idx_set = set(range(a.ndim)) - set([axis1, axis2])
n1 = len(a.chunks[axis1])
n2 = len(a.chunks[axis2])
for idx in product(*(range(len(a.chunks[i])) for i in idx_set)):
for i, (i1, i2) in enumerate(product(range(n1), range(n2))):
tsk = reduce(getitem, idx[:axis1], a.__dask_keys__())[i1]
tsk = reduce(getitem, idx[axis1:axis2 - 1], tsk)[i2]
tsk = reduce(getitem, idx[axis2 - 1:], tsk)
k = chunk_offsets[i1][i2]
dsk[(name,) + idx + (i,)] = (np.diagonal, tsk, k, axis1, axis2)

left_shape = tuple(a.shape[i] for i in idx_set)
right_shape = (_diag_len(a.shape[axis1], a.shape[axis2], offset),)
shape = left_shape + right_shape

left_chunks = tuple(a.chunks[i] for i in idx_set)
right_shape = (tuple(diag_chunks),)
chunks = left_chunks + right_shape

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[a])
return Array(graph, name, shape=shape, chunks=chunks, dtype=a.dtype)


def triu(m, k=0):
"""
Upper triangle of an array with elements above the `k`-th diagonal zeroed.
Expand Down
85 changes: 84 additions & 1 deletion dask/array/tests/test_creation.py
Expand Up @@ -7,7 +7,7 @@

import dask
import dask.array as da
from dask.array.utils import assert_eq, same_keys
from dask.array.utils import assert_eq, same_keys, AxisError


@pytest.mark.parametrize(
Expand Down Expand Up @@ -378,6 +378,89 @@ def test_diag():
assert_eq(da.diag(d), np.diag(x))


def test_diagonal():
v = np.arange(11)
with pytest.raises(ValueError):
da.diagonal(v)

v = np.arange(4).reshape((2, 2))
with pytest.raises(ValueError):
da.diagonal(v, axis1=0, axis2=0)

with pytest.raises(AxisError):
da.diagonal(v, axis1=-4)

with pytest.raises(AxisError):
da.diagonal(v, axis2=-4)

v = np.arange(4 * 5 * 6).reshape((4, 5, 6))
v = da.from_array(v, chunks=2)
assert_eq(da.diagonal(v), np.diagonal(v))
# Empty diagonal.
assert_eq(da.diagonal(v, offset=10), np.diagonal(v, offset=10))
assert_eq(da.diagonal(v, offset=-10), np.diagonal(v, offset=-10))

with pytest.raises(ValueError):
da.diagonal(v, axis1=-2)

# Negative axis.
assert_eq(da.diagonal(v, axis1=-1), np.diagonal(v, axis1=-1))
assert_eq(da.diagonal(v, offset=1, axis1=-1), np.diagonal(v, offset=1, axis1=-1))

# Heterogenous chunks.
v = np.arange(2 * 3 * 4 * 5 * 6).reshape((2, 3, 4, 5, 6))
v = da.from_array(v, chunks=(1, (1, 2), (1, 2, 1), (2, 1, 2), (5, 1)))

assert_eq(da.diagonal(v), np.diagonal(v))
assert_eq(da.diagonal(v, offset=2, axis1=3, axis2=1),
np.diagonal(v, offset=2, axis1=3, axis2=1))

assert_eq(da.diagonal(v, offset=-2, axis1=3, axis2=1),
np.diagonal(v, offset=-2, axis1=3, axis2=1))

assert_eq(da.diagonal(v, offset=-2, axis1=3, axis2=4),
np.diagonal(v, offset=-2, axis1=3, axis2=4))

assert_eq(da.diagonal(v, 1), np.diagonal(v, 1))
assert_eq(da.diagonal(v, -1), np.diagonal(v, -1))
# Positional arguments
assert_eq(da.diagonal(v, 1, 2, 1), np.diagonal(v, 1, 2, 1))

v = np.arange(2 * 3 * 4 * 5 * 6).reshape((2, 3, 4, 5, 6))
assert_eq(da.diagonal(v, axis1=1, axis2=3), np.diagonal(v, axis1=1, axis2=3))
assert_eq(da.diagonal(v, offset=1, axis1=1, axis2=3),
np.diagonal(v, offset=1, axis1=1, axis2=3))

assert_eq(da.diagonal(v, offset=1, axis1=3, axis2=1),
np.diagonal(v, offset=1, axis1=3, axis2=1))

assert_eq(da.diagonal(v, offset=-5, axis1=3, axis2=1),
np.diagonal(v, offset=-5, axis1=3, axis2=1))

assert_eq(da.diagonal(v, offset=-6, axis1=3, axis2=1),
np.diagonal(v, offset=-6, axis1=3, axis2=1))

assert_eq(da.diagonal(v, offset=-6, axis1=-3, axis2=1),
np.diagonal(v, offset=-6, axis1=-3, axis2=1))

assert_eq(da.diagonal(v, offset=-6, axis1=-3, axis2=1),
np.diagonal(v, offset=-6, axis1=-3, axis2=1))

v = da.from_array(v, chunks=2)
assert_eq(da.diagonal(v, offset=1, axis1=3, axis2=1),
np.diagonal(v, offset=1, axis1=3, axis2=1))
assert_eq(da.diagonal(v, offset=-1, axis1=3, axis2=1),
np.diagonal(v, offset=-1, axis1=3, axis2=1))

v = np.arange(384).reshape((8, 8, 6))
assert_eq(da.diagonal(v, offset=-1, axis1=2),
np.diagonal(v, offset=-1, axis1=2))

v = da.from_array(v, chunks=(4, 4, 2))
assert_eq(da.diagonal(v, offset=-1, axis1=2),
np.diagonal(v, offset=-1, axis1=2))


@pytest.mark.parametrize('dtype', [None, 'f8', 'i8'])
@pytest.mark.parametrize('func, kwargs', [
(lambda x, y: x + y, {}),
Expand Down
14 changes: 14 additions & 0 deletions dask/array/tests/test_masked.py
Expand Up @@ -348,3 +348,17 @@ def test_average_weights_with_masked_array():
da_avg = da.ma.average(d_a, weights=d_weights, axis=1)

assert_eq(np_avg, da_avg)


def test_arithmetic_results_in_masked():
mask = np.array([[True, False],
[True, True],
[False, True]])
x = np.arange(6).reshape((3, 2))
masked = np.ma.array(x, mask=mask)
dx = da.from_array(x, chunks=(2, 2))

res = dx + masked
sol = x + masked
assert_eq(res, sol)
assert isinstance(res.compute(), np.ma.masked_array)
19 changes: 10 additions & 9 deletions dask/bag/core.py
Expand Up @@ -343,7 +343,7 @@ def __setstate__(self, state):
self.dask, self.key = state

def apply(self, func):
name = 'apply-{0}-{1}'.format(funcname(func), tokenize(self, func))
name = '{0}-{1}'.format(funcname(func), tokenize(self, func, 'apply'))
dsk = {name: (func, self.key)}
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return Item(graph, name)
Expand Down Expand Up @@ -538,8 +538,8 @@ def starmap(self, func, **kwargs):
>>> b.starmap(myadd, z=max_second).compute()
[13, 17, 21, 25, 29]
"""
name = 'starmap-{0}-{1}'.format(funcname(func),
tokenize(self, func, kwargs))
name = '{0}-{1}'.format(funcname(func),
tokenize(self, func, 'starmap', **kwargs))
dependencies = [self]
if kwargs:
kwargs, collections = unpack_scalar_dask_kwargs(kwargs)
Expand Down Expand Up @@ -727,7 +727,8 @@ def to_avro(self, filename, schema, name_function=None,
return to_avro(self, filename, schema, name_function, storage_options,
codec, sync_interval, metadata, compute, **kwargs)

def fold(self, binop, combine=None, initial=no_default, split_every=None):
def fold(self, binop, combine=None, initial=no_default, split_every=None,
out_type=Item):
""" Parallelizable reduction
Fold is like the builtin function ``reduce`` except that it works in
Expand Down Expand Up @@ -777,11 +778,11 @@ def fold(self, binop, combine=None, initial=no_default, split_every=None):
if initial is not no_default:
return self.reduction(curry(_reduce, binop, initial=initial),
curry(_reduce, combine),
split_every=split_every)
split_every=split_every, out_type=out_type)
else:
from toolz.curried import reduce
return self.reduction(reduce(binop), reduce(combine),
split_every=split_every)
split_every=split_every, out_type=out_type)

def frequencies(self, split_every=None, sort=False):
""" Count number of occurrences of each distinct element.
Expand Down Expand Up @@ -1848,7 +1849,7 @@ def bag_map(func, *args, **kwargs):
>>> db.map(myadd, b, b.max()).compute()
[4, 5, 6, 7, 8]
"""
name = 'map-%s-%s' % (funcname(func), tokenize(func, args, kwargs))
name = '%s-%s' % (funcname(func), tokenize(func, 'map', *args, **kwargs))
dsk = {}
dependencies = []

Expand Down Expand Up @@ -1941,8 +1942,8 @@ def map_partitions(func, *args, **kwargs):
single graph, and then computes everything at once, and in some cases
may be more efficient.
"""
name = 'map-partitions-%s-%s' % (funcname(func),
tokenize(func, args, kwargs))
name = '%s-%s' % (funcname(func),
tokenize(func, 'map-partitions', *args, **kwargs))
dsk = {}
dependencies = []

Expand Down
17 changes: 17 additions & 0 deletions dask/bag/tests/test_bag.py
Expand Up @@ -218,6 +218,15 @@ def binop(acc, x):
assert set(e.fold(add, initial=[]).compute(scheduler='sync')) == set([1, 2, 3])


def test_fold_bag():
def binop(tot, x):
tot.add(x)
return tot
c = b.fold(binop, combine=set.union, initial=set(), out_type=Bag)
assert isinstance(c, Bag)
assert_eq(c, list(set(range(5))))


def test_distinct():
assert sorted(b.distinct()) == [0, 1, 2, 3, 4]
assert b.distinct().name == b.distinct().name
Expand Down Expand Up @@ -1317,3 +1326,11 @@ def append_str(partition, s):
['afoo', 'bfoo', 'cfoo'])
assert_eq(mybag.map_partitions(append_str, dask.delayed("foo")),
['afoo', 'bfoo', 'cfoo'])


def test_map_keynames():
b = db.from_sequence([1, 2, 3])
d = dict(b.map(inc).__dask_graph__())
assert 'inc' in map(dask.utils.key_split, d)

assert set(b.map(inc).__dask_graph__()) != set(b.map_partitions(inc).__dask_graph__())

0 comments on commit 1b79c42

Please sign in to comment.