From 7566d58f6d05d619b6205902c85f0df14e9f6c7b Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 17 Jan 2019 07:39:51 -0800 Subject: [PATCH] Add explicit backends.py files in dask.array/dataframe This collects lazily registered functions for cupy, sparse, scipy, and cudf. --- dask/array/__init__.py | 2 +- dask/array/backends.py | 33 +++++++++++++++++++++++++++++++++ dask/array/core.py | 32 -------------------------------- dask/array/utils.py | 2 +- dask/dataframe/__init__.py | 2 +- dask/dataframe/backends.py | 37 +++++++++++++++++++++++++++++++++++++ dask/dataframe/core.py | 24 ------------------------ dask/dataframe/methods.py | 9 --------- 8 files changed, 73 insertions(+), 68 deletions(-) create mode 100644 dask/array/backends.py create mode 100644 dask/dataframe/backends.py diff --git a/dask/array/__init__.py b/dask/array/__init__.py index b4f5444fe8a..139dde18754 100644 --- a/dask/array/__init__.py +++ b/dask/array/__init__.py @@ -47,7 +47,7 @@ from .reductions import nanprod, nancumprod, nancumsum with ignoring(ImportError): from . import ma - from . import random, linalg, overlap, fft + from . import random, linalg, overlap, fft, backends from .overlap import map_overlap from .wrap import ones, zeros, empty, full from .creation import ones_like, zeros_like, empty_like, full_like diff --git a/dask/array/backends.py b/dask/array/backends.py new file mode 100644 index 00000000000..55de0d17987 --- /dev/null +++ b/dask/array/backends.py @@ -0,0 +1,33 @@ +from .core import tensordot_lookup, concatenate_lookup + + +@tensordot_lookup.register_lazy('cupy') +@concatenate_lookup.register_lazy('cupy') +def register_cupy(): + import cupy + concatenate_lookup.register(cupy.ndarray, cupy.concatenate) + tensordot_lookup.register(cupy.ndarray, cupy.tensordot) + + +@tensordot_lookup.register_lazy('sparse') +@concatenate_lookup.register_lazy('sparse') +def register_sparse(): + import sparse + concatenate_lookup.register(sparse.COO, sparse.concatenate) + tensordot_lookup.register(sparse.COO, sparse.tensordot) + + +@concatenate_lookup.register_lazy('scipy') +def register_scipy_sparse(): + import scipy.sparse + + def _concatenate(L, axis=0): + if axis == 0: + return scipy.sparse.vstack(L) + elif axis == 1: + return scipy.sparse.hstack(L) + else: + msg = ("Can only concatenate scipy sparse matrices for axis in " + "{0, 1}. Got %s" % axis) + raise ValueError(msg) + concatenate_lookup.register(scipy.sparse.spmatrix, _concatenate) diff --git a/dask/array/core.py b/dask/array/core.py index bdb31a8334c..accf963f636 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -61,38 +61,6 @@ tensordot_lookup.register((object, np.ndarray), np.tensordot) -@tensordot_lookup.register_lazy('cupy') -@concatenate_lookup.register_lazy('cupy') -def register_cupy(): - import cupy - concatenate_lookup.register(cupy.ndarray, cupy.concatenate) - tensordot_lookup.register(cupy.ndarray, cupy.tensordot) - - -@tensordot_lookup.register_lazy('sparse') -@concatenate_lookup.register_lazy('sparse') -def register_sparse(): - import sparse - concatenate_lookup.register(sparse.COO, sparse.concatenate) - tensordot_lookup.register(sparse.COO, sparse.tensordot) - - -@concatenate_lookup.register_lazy('scipy') -def register_scipy_sparse(): - import scipy.sparse - - def _concatenate(L, axis=0): - if axis == 0: - return scipy.sparse.vstack(L) - elif axis == 1: - return scipy.sparse.hstack(L) - else: - msg = ("Can only concatenate scipy sparse matrices for axis in " - "{0, 1}. Got %s" % axis) - raise ValueError(msg) - concatenate_lookup.register(scipy.sparse.spmatrix, _concatenate) - - class PerformanceWarning(Warning): """ A warning given when bad chunking may cause poor performance """ diff --git a/dask/array/utils.py b/dask/array/utils.py index 7f289ff4d26..408ef820581 100644 --- a/dask/array/utils.py +++ b/dask/array/utils.py @@ -22,7 +22,7 @@ def normalize_to_array(x): - if 'cupy' in str(type(x)): + if 'cupy' in str(type(x)): # TODO: avoid explicit reference to cupy return x.get() else: return x diff --git a/dask/dataframe/__init__.py b/dask/dataframe/__init__.py index 396875f7a4d..ee6c0bbbf95 100644 --- a/dask/dataframe/__init__.py +++ b/dask/dataframe/__init__.py @@ -11,7 +11,7 @@ read_fwf) from .optimize import optimize from .multi import merge, concat - from . import rolling + from . import rolling, backends from ..base import compute from .reshape import get_dummies, pivot_table, melt from .utils import assert_eq diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py new file mode 100644 index 00000000000..b400395bab6 --- /dev/null +++ b/dask/dataframe/backends.py @@ -0,0 +1,37 @@ +from .methods import concat_dispatch +from .core import get_parallel_type, meta_nonempty, make_meta + + +###################################### +# cuDF: Pandas Dataframes on the GPU # +###################################### + + +@concat_dispatch.register_lazy('cudf') +@get_parallel_type.register_lazy('cudf') +@meta_nonempty.register_lazy('cudf') +@make_meta.register_lazy('cudf') +def _register_cudf(): + import cudf + import dask_cudf + get_parallel_type.register(cudf.DataFrame, lambda _: dask_cudf.DataFrame) + get_parallel_type.register(cudf.Series, lambda _: dask_cudf.Series) + get_parallel_type.register(cudf.Index, lambda _: dask_cudf.Index) + + @meta_nonempty.register((cudf.DataFrame, cudf.Series, cudf.Index)) + def _(x): + y = meta_nonempty(x.to_pandas()) # TODO: add iloc[:5] + return cudf.from_pandas(y) + + @make_meta.register((cudf.Series, cudf.DataFrame)) + def _(x): + return x.head(0) + + @make_meta.register(cudf.Index) + def _(x): + return x[:0] + + concat_dispatch.register( + (cudf.DataFrame, cudf.Series, cudf.Index), + cudf.concat + ) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index ac4088d1ae2..4c38ce8cd36 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -4495,30 +4495,6 @@ def get_parallel_type_frame(o): return get_parallel_type(o._meta) -@get_parallel_type.register_lazy('cudf') -@meta_nonempty.register_lazy('cudf') -@make_meta.register_lazy('cudf') -def _register_cudf(): - import cudf - import dask_cudf - get_parallel_type.register(cudf.DataFrame, lambda _: dask_cudf.DataFrame) - get_parallel_type.register(cudf.Series, lambda _: dask_cudf.Series) - get_parallel_type.register(cudf.Index, lambda _: dask_cudf.Index) - - @meta_nonempty.register((cudf.DataFrame, cudf.Series, cudf.Index)) - def _(x): - y = meta_nonempty(x.to_pandas()) # TODO: add iloc[:5] - return cudf.from_pandas(y) - - @make_meta.register((cudf.Series, cudf.DataFrame)) - def _(x): - return x.head(0) - - @make_meta.register(cudf.Index) - def _(x): - return x[:0] - - def parallel_types(): return tuple(k for k, v in get_parallel_type._lookup.items() if v is not get_parallel_type_object) diff --git a/dask/dataframe/methods.py b/dask/dataframe/methods.py index c8f2c2ed0e3..49f7f9b5b56 100644 --- a/dask/dataframe/methods.py +++ b/dask/dataframe/methods.py @@ -247,15 +247,6 @@ def _get_level_values(x, n): concat_dispatch = Dispatch('concat') -@concat_dispatch.register_lazy('cudf') -def register_cudf(): - import cudf - concat_dispatch.register( - (cudf.DataFrame, cudf.Series, cudf.Index), - cudf.concat - ) - - def concat(dfs, axis=0, join='outer', uniform=False, filter_warning=True): """Concatenate, handling some edge cases: