Skip to content

Commit

Permalink
Merge branch 'master' of github.com:dask/dask into merge-dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Feb 26, 2019
2 parents a37c4cd + e0006dc commit a6fcc36
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 63 deletions.
114 changes: 53 additions & 61 deletions dask/array/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..compatibility import getargspec, builtins
from ..base import tokenize
from ..highlevelgraph import HighLevelGraph
from ..utils import ignoring, funcname, Dispatch
from ..utils import ignoring, funcname, Dispatch, deepmap
from .. import config

# Generic functions to support chunks of different types
Expand Down Expand Up @@ -331,26 +331,26 @@ def nannumel(x, **kwargs):
def mean_chunk(x, sum=chunk.sum, numel=numel, dtype='f8', **kwargs):
n = numel(x, dtype=dtype, **kwargs)
total = sum(x, dtype=dtype, **kwargs)
empty = empty_lookup.dispatch(type(n))
result = empty(n.shape, dtype=[('total', total.dtype), ('n', n.dtype)])
result['n'] = n
result['total'] = total
return result
return {'n': n, 'total': total}


def mean_combine(pair, sum=chunk.sum, numel=numel, dtype='f8', **kwargs):
n = sum(pair['n'], **kwargs)
total = sum(pair['total'], **kwargs)
empty = empty_lookup.dispatch(type(n))
result = empty(n.shape, dtype=pair.dtype)
result['n'] = n
result['total'] = total
return result
def mean_combine(pairs, sum=chunk.sum, numel=numel, dtype='f8', axis=None, **kwargs):
if not isinstance(pairs, list):
pairs = [pairs]
ns = deepmap(lambda pair: pair['n'], pairs)
totals = deepmap(lambda pair: pair['total'], pairs)
n = _concatenate2(ns, axes=axis).sum(axis=axis, **kwargs)
total = _concatenate2(totals, axes=axis).sum(axis=axis, **kwargs)
return {'n': n, 'total': total}


def mean_agg(pair, dtype='f8', **kwargs):
return divide(pair['total'].sum(dtype=dtype, **kwargs),
pair['n'].sum(dtype=dtype, **kwargs), dtype=dtype)
def mean_agg(pairs, dtype='f8', axis=None, **kwargs):
ns = deepmap(lambda pair: pair['n'], pairs)
totals = deepmap(lambda pair: pair['total'], pairs)
n = _concatenate2(ns, axes=axis).sum(axis=axis, dtype=dtype, **kwargs)
total = _concatenate2(totals, axes=axis).sum(axis=axis, dtype=dtype, **kwargs)

return divide(total, n, dtype=dtype)


@wraps(chunk.mean)
Expand All @@ -361,7 +361,7 @@ def mean(a, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
dt = getattr(np.mean(np.empty(shape=(1,), dtype=a.dtype)), 'dtype', object)
return reduction(a, mean_chunk, mean_agg, axis=axis, keepdims=keepdims,
dtype=dt, split_every=split_every, combine=mean_combine,
out=out)
out=out, concatenate=False)


def nanmean(a, axis=None, dtype=None, keepdims=False, split_every=None,
Expand All @@ -373,6 +373,7 @@ def nanmean(a, axis=None, dtype=None, keepdims=False, split_every=None,
return reduction(a, partial(mean_chunk, sum=chunk.nansum, numel=nannumel),
mean_agg, axis=axis, keepdims=keepdims, dtype=dt,
split_every=split_every, out=out,
concatenate=False,
combine=partial(mean_combine, sum=chunk.nansum, numel=nannumel))


Expand All @@ -384,70 +385,58 @@ def moment_chunk(A, order=2, sum=chunk.sum, numel=numel, dtype='f8', **kwargs):
total = sum(A, dtype=dtype, **kwargs)
n = numel(A, **kwargs).astype(np.int64)
u = total / n
empty = empty_lookup.dispatch(type(n))
M = empty(n.shape + (order - 1,), dtype=dtype)
for i in range(2, order + 1):
M[..., i - 2] = sum((A - u)**i, dtype=dtype, **kwargs)
result = empty(n.shape, dtype=[('total', total.dtype),
('n', n.dtype),
('M', M.dtype, (order - 1,))])
result['total'] = total
result['n'] = n
result['M'] = M
return result
xs = [sum((A - u)**i, dtype=dtype, **kwargs) for i in range(2, order + 1)]
M = np.stack(xs, axis=-1)
return {'total': total, 'n': n, 'M': M}


def _moment_helper(Ms, ns, inner_term, order, sum, kwargs):
M = Ms[..., order - 2].sum(**kwargs) + sum(ns * inner_term ** order, **kwargs)
def _moment_helper(Ms, ns, inner_term, order, sum, axis, kwargs):
M = Ms[..., order - 2].sum(axis=axis, **kwargs) + sum(ns * inner_term ** order, axis=axis, **kwargs)
for k in range(1, order - 1):
coeff = factorial(order) / (factorial(k) * factorial(order - k))
M += coeff * sum(Ms[..., order - k - 2] * inner_term**k, **kwargs)
M += coeff * sum(Ms[..., order - k - 2] * inner_term**k, axis=axis, **kwargs)
return M


def moment_combine(data, order=2, ddof=0, dtype='f8', sum=np.sum, **kwargs):
def moment_combine(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, **kwargs):
if not isinstance(pairs, list):
pairs = [pairs]
totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis)
ns = _concatenate2(deepmap(lambda pair: pair['n'], pairs), axes=axis)
Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis)

kwargs['dtype'] = dtype
kwargs['keepdims'] = True

totals = data['total']
ns = data['n']
Ms = data['M']
total = totals.sum(**kwargs)
n = sum(ns, **kwargs)
total = totals.sum(axis=axis, **kwargs)
n = ns.sum(axis=axis, **kwargs)
mu = divide(total, n, dtype=dtype)
inner_term = divide(totals, ns, dtype=dtype) - mu
empty = empty_lookup.dispatch(type(n))
M = empty(n.shape + (order - 1,), dtype=dtype)

for o in range(2, order + 1):
M[..., o - 2] = _moment_helper(Ms, ns, inner_term, o, sum, kwargs)

result = empty(n.shape, dtype=[('total', total.dtype),
('n', n.dtype),
('M', Ms.dtype, (order - 1,))])
result['total'] = total
result['n'] = n
result['M'] = M
return result

xs = [_moment_helper(Ms, ns, inner_term, o, sum, axis, kwargs) for o in range(2, order + 1)]
M = np.stack(xs, axis=-1)
return {'total': total, 'n': n, 'M': M}


def moment_agg(data, order=2, ddof=0, dtype='f8', sum=np.sum, **kwargs):
totals = data['total']
ns = data['n']
Ms = data['M']
def moment_agg(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, **kwargs):
if not isinstance(pairs, list):
pairs = [pairs]
totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis)
ns = _concatenate2(deepmap(lambda pair: pair['n'], pairs), axes=axis)
Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis)

kwargs['dtype'] = dtype
# To properly handle ndarrays, the original dimensions need to be kept for
# part of the calculation.
keepdim_kw = kwargs.copy()
keepdim_kw['keepdims'] = True

n = sum(ns, **keepdim_kw)
mu = divide(totals.sum(**keepdim_kw), n, dtype=dtype)
n = ns.sum(axis=axis, **keepdim_kw)
mu = divide(totals.sum(axis=axis, **keepdim_kw), n, dtype=dtype)
inner_term = divide(totals, ns, dtype=dtype) - mu

M = _moment_helper(Ms, ns, inner_term, order, sum, kwargs)
return divide(M, sum(n, **kwargs) - ddof, dtype=dtype)
M = _moment_helper(Ms, ns, inner_term, order, sum, axis, kwargs)
return divide(M, n.sum(axis=axis, **kwargs) - ddof, dtype=dtype)


def moment(a, order, axis=None, dtype=None, keepdims=False, ddof=0,
Expand All @@ -471,6 +460,7 @@ def moment(a, order, axis=None, dtype=None, keepdims=False, ddof=0,
partial(moment_agg, order=order, ddof=ddof),
axis=axis, keepdims=keepdims,
dtype=dt, split_every=split_every, out=out,
concatenate=False,
combine=partial(moment_combine, order=order))


Expand All @@ -483,7 +473,8 @@ def var(a, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None,
dt = getattr(np.var(np.ones(shape=(1,), dtype=a.dtype)), 'dtype', object)
return reduction(a, moment_chunk, partial(moment_agg, ddof=ddof), axis=axis,
keepdims=keepdims, dtype=dt, split_every=split_every,
combine=moment_combine, name='var', out=out)
combine=moment_combine, name='var', out=out,
concatenate=False)


def nanvar(a, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None,
Expand All @@ -495,7 +486,8 @@ def nanvar(a, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None,
return reduction(a, partial(moment_chunk, sum=chunk.nansum, numel=nannumel),
partial(moment_agg, sum=np.nansum, ddof=ddof), axis=axis,
keepdims=keepdims, dtype=dt, split_every=split_every,
combine=partial(moment_combine, sum=np.nansum), out=out)
combine=partial(moment_combine, sum=np.nansum), out=out,
concatenate=False)


with ignoring(AttributeError):
Expand Down
4 changes: 4 additions & 0 deletions docs/release-procedure.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ co-released. We may want to check their status while releasing

Release per project:

* Raise an issue in the https://github.com/dask/dask issue tracker signalling
your intent to release and the motivation. Let that issue collect comments
for a day to ensure that other maintainers are comfortable with releasing.

* Update release notes in docs/source/changelog.rst

* Commit
Expand Down
61 changes: 59 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,59 @@
Changelog
=========

1.1.2 / 2019-02-25
------------------

Array
+++++

- Fix another unicode/mixed-type edge case in normalize_array (:pr:`4489`) `Marco Neumann`_
- Add dask.array.diagonal (:pr:`4431`) `Danilo Horta`_
- Call asanyarray in unify_chunks (:pr:`4506`) `Jim Crist`_
- Modify moment chunk functions to return dicts (:pr:`4519`) `Peter Andreas Entschev`_


Bag
+++

- Don't inline output keys in dask.bag (:pr:`4464`) `Jim Crist`_
- Ensure that bag.from_sequence always includes at least one partition (:pr:`4475`) `Anderson Banihirwe`_
- Implement out_type for bag.fold (:pr:`4502`) `Matthew Rocklin`_
- Remove map from bag keynames (:pr:`4500`) `Matthew Rocklin`_
- Avoid itertools.repeat in map_partitions (:pr:`4507`) `Matthew Rocklin`_


DataFrame
+++++++++

- Fix relative path parsing on windows when using fastparquet (:pr:`4445`) `Janne Vuorela`_
- Fix bug in pyarrow and hdfs (:pr:`4453`) (:pr:`4455`) `Michał Jastrzębski`_
- df getitem with integer slices is not implemented (:pr:`4466`) `Jim Crist`_
- Replace cudf-specific code with dask-cudf import (:pr:`4470`) `Matthew Rocklin`_
- Avoid groupby.agg(callable`) in groupby-var (:pr:`4482`) `Matthew Rocklin`_
- Consider uint types as numerical in check_meta (:pr:`4485`) `Marco Neumann`_
- Fix some typos in groupby comments (:pr:`4494`) `Daniel Saxton`_
- Add error message around set_index(inplace=True`) (:pr:`4501`) `Matthew Rocklin`_
- meta_nonempty works with categorical index (:pr:`4505`) `Jim Crist`_
- Add module name to expected meta error message (:pr:`4499`) `Matthew Rocklin`_
- groupby-nunique works on empty chunk (:pr:`4504`) `Jim Crist`_
- Propogate index metadata if not specified (:pr:`4509`) `Jim Crist`_

Documentation
+++++++++++++

- Update docs to use ``from_zarr`` (:pr:`4472`) `John A Kirkham`_
- DOC: add section of `Using Other S3-Compatible Services` for remote-data-services (:pr:`4405`) `Aploium`_
- Fix header level of section in changelog (:pr:`4483`) `Bruce Merry`_
- Add quotes to pip install [skip-ci] (:pr:`4508`) `James Bourbeau`_

Core
++++

- Extend started_cbs AFTER state is initialized (:pr:`4460`) `Marco Neumann`_
- Fix bug in HTTPFile._fetch_range with headers (:pr:`4479`) (:pr:`4480`) `Ross Petchler`_
- Repeat optimize_blockwise for diamond fusion (:pr:`4492`) `Matthew Rocklin`_


1.1.1 / 2019-01-31
------------------
Expand Down Expand Up @@ -36,7 +89,7 @@ Documentation
Core
++++

- Work around psutil 5.5.0 not allowing pickling Process objects `Dimplexion`_
- Work around psutil 5.5.0 not allowing pickling Process objects `Janne Vuorela`_


1.1.0 / 2019-01-18
Expand Down Expand Up @@ -1776,4 +1829,8 @@ Other
.. _`Johnnie Gray`: https://github.com/jcmgray
.. _`Roma Sokolov`: https://github.com/little-arhat
.. _`Daniel Severo`: https://github.com/daniel-severo
.. _`Dimplexion`: https://github.com/Dimplexion
.. _`Michał Jastrzębski`: https://github.com/inc0
.. _`Janne Vuorela`: https://github.com/Dimplexion
.. _`Ross Petchler`: https://github.com/rpetchler
.. _`Aploium`: https://github.com/aploium
.. _`Peter Andreas Entschev`: https://github.com/pentschev
1 change: 1 addition & 0 deletions docs/source/dataframe-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Dataframe
DataFrame.npartitions
DataFrame.partitions
DataFrame.pow
DataFrame.prod
DataFrame.quantile
DataFrame.query
DataFrame.radd
Expand Down

0 comments on commit a6fcc36

Please sign in to comment.