Skip to content

Commit

Permalink
Merge branch 'master' into more-register-cudf
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jan 22, 2019
2 parents 7566d58 + 7989ebd commit 996bab8
Show file tree
Hide file tree
Showing 33 changed files with 1,228 additions and 71 deletions.
2 changes: 0 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ environment:
# Since appveyor is quite slow, we only use a single configuration
- PYTHON: "3.6"
ARCH: "64"
NUMPY: "1.12.1"
PANDAS: "0.19.2"
CONDA_ENV: testenv

init:
Expand Down
19 changes: 9 additions & 10 deletions continuous_integration/appveyor/setup_conda_environment.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ call activate %CONDA_ENV%
@rem Pin matrix items
@rem Please see PR ( https://github.com/dask/dask/pull/2185 ) for details.
copy NUL %CONDA_PREFIX%\conda-meta\pinned
echo numpy %NUMPY% >> %CONDA_PREFIX%\conda-meta\pinned
echo pandas %PANDAS% >> %CONDA_PREFIX%\conda-meta\pinned

@rem Install optional dependencies for tests
%CONDA_INSTALL% numpy pandas cloudpickle distributed
%CONDA_INSTALL% bcolz bokeh h5py ipython lz4 psutil pytables s3fs scipy pyarrow fastparquet

%PIP_INSTALL% git+https://github.com/dask/partd --upgrade
%PIP_INSTALL% git+https://github.com/dask/cachey --upgrade
%PIP_INSTALL% git+https://github.com/dask/distributed --upgrade
%PIP_INSTALL% git+https://github.com/mrocklin/sparse --upgrade
%PIP_INSTALL% blosc --upgrade
%PIP_INSTALL% moto
%CONDA_INSTALL% bcolz bokeh h5py ipython lz4 psutil pytables s3fs scipy
%CONDA_INSTALL% -c conda-forge fastparquet snappy

%PIP_INSTALL% --no-deps --upgrade locket git+https://github.com/dask/partd
%PIP_INSTALL% --no-deps --upgrade heapdict git+https://github.com/dask/cachey
%PIP_INSTALL% --upgrade git+https://github.com/dask/distributed
%PIP_INSTALL% --no-deps git+https://github.com/pydata/sparse
%PIP_INSTALL% --no-deps --upgrade blosc --upgrade
%PIP_INSTALL% --no-deps moto Jinja2 boto boto3 botocore cryptography requests xmltodict six werkzeug PyYAML pytz python-dateutil python-jose mock docker jsondiff==1.1.2 aws-xray-sdk responses idna cfn-lint

if %PYTHON% LSS 3.0 (%PIP_INSTALL% backports.lzma mock)

Expand Down
5 changes: 3 additions & 2 deletions continuous_integration/travis/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ conda install -q -c conda-forge \
h5py \
ipython \
lz4 \
numba \
partd \
psutil \
pytables \
Expand All @@ -62,10 +63,10 @@ conda install -q -c conda-forge \
sqlalchemy \
toolz

pip install --upgrade --no-deps git+https://github.com/dask/partd
pip install --upgrade --no-deps locket git+https://github.com/dask/partd
pip install --upgrade --no-deps git+https://github.com/dask/zict
pip install --upgrade --no-deps git+https://github.com/dask/distributed
pip install --upgrade --no-deps git+https://github.com/mrocklin/sparse
pip install --upgrade --no-deps git+https://github.com/pydata/sparse
pip install --upgrade --no-deps git+https://github.com/dask/s3fs

if [[ $PYTHONOPTIMIZE != '2' ]] && [[ $NUMPY > '1.11.0' ]] && [[ $NUMPY < '1.14.0' ]]; then
Expand Down
9 changes: 8 additions & 1 deletion dask/array/backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .core import tensordot_lookup, concatenate_lookup
from .core import tensordot_lookup, concatenate_lookup, einsum_lookup


@tensordot_lookup.register_lazy('cupy')
Expand All @@ -8,6 +8,13 @@ def register_cupy():
concatenate_lookup.register(cupy.ndarray, cupy.concatenate)
tensordot_lookup.register(cupy.ndarray, cupy.tensordot)

@einsum_lookup.register(cupy.ndarray)
def _cupy_einsum(*args, **kwargs):
# NB: cupy does not accept `order` or `casting` kwargs - ignore
kwargs.pop('casting', None)
kwargs.pop('order', None)
return cupy.einsum(*args, **kwargs)


@tensordot_lookup.register_lazy('sparse')
@concatenate_lookup.register_lazy('sparse')
Expand Down
11 changes: 0 additions & 11 deletions dask/array/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,6 @@ def view(x, dtype, order='C'):
return x.T.view(dtype).T


def einsum(*operands, **kwargs):
subscripts = kwargs.pop('subscripts')
ncontract_inds = kwargs.pop('ncontract_inds')
dtype = kwargs.pop('kernel_dtype')
chunk = np.einsum(subscripts, *operands, dtype=dtype, **kwargs)

# Avoid concatenate=True in blockwise by adding 1's
# for the contracted dimensions
return chunk.reshape(chunk.shape + (1,) * ncontract_inds)


def slice_with_int_dask_array(x, idx, offset, x_size, axis):
""" Chunk function of `slice_with_int_dask_array_on_axis`.
Slice one chunk of x by one chunk of idx.
Expand Down
2 changes: 2 additions & 0 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@

concatenate_lookup = Dispatch('concatenate')
tensordot_lookup = Dispatch('tensordot')
einsum_lookup = Dispatch('einsum')
concatenate_lookup.register((object, np.ndarray), np.concatenate)
tensordot_lookup.register((object, np.ndarray), np.tensordot)
einsum_lookup.register((object, np.ndarray), np.einsum)


class PerformanceWarning(Warning):
Expand Down
17 changes: 14 additions & 3 deletions dask/array/einsumfuncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@
import numpy as np
from numpy.compat import basestring

from .core import blockwise, asarray
from . import chunk
from .core import blockwise, asarray, einsum_lookup

einsum_symbols = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
einsum_symbols_set = set(einsum_symbols)


def chunk_einsum(*operands, **kwargs):
subscripts = kwargs.pop('subscripts')
ncontract_inds = kwargs.pop('ncontract_inds')
dtype = kwargs.pop('kernel_dtype')
einsum = einsum_lookup.dispatch(type(operands[0]))
chunk = einsum(subscripts, *operands, dtype=dtype, **kwargs)

# Avoid concatenate=True in blockwise by adding 1's
# for the contracted dimensions
return chunk.reshape(chunk.shape + (1,) * ncontract_inds)


# This function duplicates numpy's _parse_einsum_input() function
# See https://github.com/numpy/numpy/blob/master/LICENSE.txt
# or NUMPY_LICENSE.txt within this directory
Expand Down Expand Up @@ -230,7 +241,7 @@ def einsum(*operands, **kwargs):

# Introduce the contracted indices into the blockwise product
# so that we get numpy arrays, not lists
result = blockwise(chunk.einsum, tuple(outputs) + tuple(contract_inds),
result = blockwise(chunk_einsum, tuple(outputs) + tuple(contract_inds),
*(a for ap in zip(ops, inputs) for a in ap),
# blockwise parameters
adjust_chunks={ind: 1 for ind in contract_inds}, dtype=dtype,
Expand Down
1 change: 1 addition & 0 deletions dask/array/tests/test_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
lambda x: x > 0.5,
lambda x: x.rechunk((4, 4, 4)),
lambda x: x.rechunk((2, 2, 1)),
lambda x: da.einsum("ijk,ijk", x, x)
]


Expand Down
9 changes: 9 additions & 0 deletions dask/array/tests/test_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
from dask.array.utils import assert_eq

sparse = pytest.importorskip('sparse')
if sparse:
# Test failures on older versions of Numba.
# Conda-Forge provides 0.35.0 on windows right now, causing failures like
# searchsorted() got an unexpected keyword argument 'side'
pytest.importorskip("numba", minversion="0.40.0")


if LooseVersion(np.__version__) < '1.11.2':
pytestmark = pytest.mark.skip
Expand Down Expand Up @@ -79,6 +85,7 @@ def test_tensordot():
da.tensordot(xx, yy, axes=((1, 2), (1, 0))))


@pytest.mark.xfail(reason="upstream change", strict=False)
@pytest.mark.parametrize('func', functions)
def test_mixed_concatenate(func):
x = da.random.random((2, 3, 4), chunks=(1, 2, 2))
Expand All @@ -96,6 +103,7 @@ def test_mixed_concatenate(func):
assert_eq(dd, ss)


@pytest.mark.xfail(reason="upstream change", strict=False)
@pytest.mark.parametrize('func', functions)
def test_mixed_random(func):
d = da.random.random((4, 3, 4), chunks=(1, 2, 2))
Expand All @@ -110,6 +118,7 @@ def test_mixed_random(func):
assert_eq(dd, ss)


@pytest.mark.xfail(reason="upstream change", strict=False)
def test_mixed_output_type():
y = da.random.random((10, 10), chunks=(5, 5))
y[y < 0.8] = 0
Expand Down
16 changes: 12 additions & 4 deletions dask/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from toolz.functoolz import Compose

from .compatibility import (apply, long, unicode, Iterator, is_dataclass,
dataclass_fields, Mapping)
dataclass_fields, Mapping, cPickle)
from .context import thread_state
from .core import flatten, quote, get as simple_get
from .hashing import hash_buffer_hex
Expand Down Expand Up @@ -729,10 +729,18 @@ def normalize_array(x):
x.shape, x.strides, offset)
if x.dtype.hasobject:
try:
data = hash_buffer_hex('-'.join(x.flat).encode('utf-8'))
# string fast-path
data = hash_buffer_hex('-'.join(x.flat).encode(encoding='utf-8', errors='surrogatepass'))
except UnicodeDecodeError:
# bytes fast-path
data = hash_buffer_hex(b'-'.join(x.flat))
except TypeError:
data = hash_buffer_hex(b'-'.join([unicode(item).encode('utf-8') for item in
x.flat]))
# object data w/o fast-path, use fast cPickle
try:
data = hash_buffer_hex(cPickle.dumps(x, cPickle.HIGHEST_PROTOCOL))
except Exception:
# pickling not supported, use UUID4-based fallback
data = uuid.uuid4().hex
else:
try:
data = hash_buffer_hex(x.ravel(order='K').view('i1'))
Expand Down
5 changes: 5 additions & 0 deletions dask/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def reraise(exc, tb=None):
raise exc.with_traceback(tb)
raise exc

import pickle as cPickle

else:
import __builtin__ as builtins
import copy_reg as copyreg
Expand Down Expand Up @@ -303,6 +305,9 @@ def writelines(self, *args, **kwargs):
pass


import cPickle


def getargspec(func):
"""Version of inspect.getargspec that works with partial and warps."""
if isinstance(func, functools.partial):
Expand Down
26 changes: 17 additions & 9 deletions dask/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,30 @@ def collect_yaml(paths=paths):
for path in paths:
if os.path.exists(path):
if os.path.isdir(path):
file_paths.extend(sorted([
os.path.join(path, p)
for p in os.listdir(path)
if os.path.splitext(p)[1].lower() in ('.json', '.yaml', '.yml')
]))
try:
file_paths.extend(sorted([
os.path.join(path, p)
for p in os.listdir(path)
if os.path.splitext(p)[1].lower() in ('.json', '.yaml', '.yml')
]))
except OSError:
# Ignore permission errors
pass
else:
file_paths.append(path)

configs = []

# Parse yaml files
for path in file_paths:
with open(path) as f:
data = yaml.load(f.read()) or {}
data = normalize_nested_keys(data)
configs.append(data)
try:
with open(path) as f:
data = yaml.load(f.read()) or {}
data = normalize_nested_keys(data)
configs.append(data)
except (OSError, IOError):
# Ignore permission errors
pass

return configs

Expand Down
4 changes: 4 additions & 0 deletions dask/dataframe/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ def remove_unused_categories(self):

# Reorder to keep cat:code relationship, filtering unused (-1)
ordered, mask = present.reindex(meta_cat.categories)
if mask is None:
# PANDAS-23963: old and new categories match.
return self._series

new_categories = ordered[mask != -1]
meta = meta_cat.set_categories(new_categories, ordered=meta_cat.ordered)
return self._series.map_partitions(self._delegate_method, 'cat',
Expand Down
81 changes: 78 additions & 3 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function

from functools import wraps, partial
Expand Down Expand Up @@ -748,7 +749,8 @@ def reduction(self, chunk, aggregate=None, combine=None, meta=no_default,
Aggregate both the sum and count of a Series at the same time:
>>> def sum_and_count(x):
... return pd.Series({'sum': x.sum(), 'count': x.count()})
... return pd.Series({'count': x.count(), 'sum': x.sum()},
... index=['count', 'sum'])
>>> res = ddf.x.reduction(sum_and_count, aggregate=lambda x: x.sum())
>>> res.compute()
count 50
Expand All @@ -761,7 +763,8 @@ def reduction(self, chunk, aggregate=None, combine=None, meta=no_default,
index, and sum each group to get the final result.
>>> def sum_and_count(x):
... return pd.DataFrame({'sum': x.sum(), 'count': x.count()})
... return pd.DataFrame({'count': x.count(), 'sum': x.sum()},
... columns=['count', 'sum'])
>>> res = ddf.reduction(sum_and_count,
... aggregate=lambda x: x.groupby(level=0).sum())
>>> res.compute()
Expand Down Expand Up @@ -2844,10 +2847,82 @@ def drop(self, labels, axis=0, errors='raise'):
return self.map_partitions(M.drop, labels, axis=axis, errors=errors)
raise NotImplementedError("Drop currently only works for axis=1")

@derived_from(pd.DataFrame)
def merge(self, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, suffixes=('_x', '_y'),
indicator=False, npartitions=None, shuffle=None):
"""Merge the DataFrame with another DataFrame
This will merge the two datasets, either on the indices, a certain column
in each dataset or the index in one dataset and the column in another.
Parameters
----------
right: dask.dataframe.DataFrame
how : {'left', 'right', 'outer', 'inner'}, default: 'inner'
How to handle the operation of the two objects:
- left: use calling frame's index (or column if on is specified)
- right: use other frame's index
- outer: form union of calling frame's index (or column if on is
specified) with other frame's index, and sort it
lexicographically
- inner: form intersection of calling frame's index (or column if
on is specified) with other frame's index, preserving the order
of the calling's one
on : label or list
Column or index level names to join on. These must be found in both
DataFrames. If on is None and not merging on indexes then this
defaults to the intersection of the columns in both DataFrames.
left_on : label or list, or array-like
Column to join on in the left DataFrame. Other than in pandas
arrays and lists are only support if their length is 1.
right_on : label or list, or array-like
Column to join on in the right DataFrame. Other than in pandas
arrays and lists are only support if their length is 1.
left_index : boolean, default False
Use the index from the left DataFrame as the join key.
right_index : boolean, default False
Use the index from the right DataFrame as the join key.
suffixes : 2-length sequence (tuple, list, ...)
Suffix to apply to overlapping column names in the left and
right side, respectively
indicator : boolean or string, default False
If True, adds a column to output DataFrame called "_merge" with
information on the source of each row. If string, column with
information on source of each row will be added to output DataFrame,
and column will be named value of string. Information column is
Categorical-type and takes on a value of "left_only" for observations
whose merge key only appears in `left` DataFrame, "right_only" for
observations whose merge key only appears in `right` DataFrame,
and "both" if the observation’s merge key is found in both.
npartitions: int, None, or 'auto'
The ideal number of output partitions. This is only utilised when
performing a hash_join (merging on columns only). If `None`
npartitions = max(lhs.npartitions, rhs.npartitions)
shuffle: {'disk', 'tasks'}, optional
Either ``'disk'`` for single-node operation or ``'tasks'`` for
distributed operation. Will be inferred by your current scheduler.
Notes
-----
There are three ways to join dataframes:
1. Joining on indices. In this case the divisions are
aligned using the function ``dask.dataframe.multi.align_partitions``.
Afterwards, each partition is merged with the pandas merge function.
2. Joining one on index and one on column. In this case the divisions of
dataframe merged by index (:math:`d_i`) are used to divide the column
merged dataframe (:math:`d_c`) one using
``dask.dataframe.multi.rearrange_by_divisions``. In this case the
merged dataframe (:math:`d_m`) has the exact same divisions
as (:math:`d_i`). This can lead to issues if you merge multiple rows from
(:math:`d_c`) to one row in (:math:`d_i`).
3. Joining both on columns. In this case a hash join is performed using
``dask.dataframe.multi.hash_join``.
"""

if not is_dataframe_like(right):
raise ValueError('right must be DataFrame')
Expand Down
Loading

0 comments on commit 996bab8

Please sign in to comment.