Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added dask data interface #974

Merged
merged 8 commits into from Nov 16, 2016
@@ -46,6 +46,12 @@
except ImportError:
pass

try:
from .dask import DaskInterface
datatypes.append('dask')
except ImportError:
pass

from ..dimension import Dimension
from ..element import Element
from ..spaces import HoloMap, DynamicMap
@@ -311,7 +317,7 @@ def __getitem__(self, slices):
selection = dict(zip(self.dimensions(label=True), slices))
data = self.select(**selection)
if value_select:
if len(data) == 1:
if data.shape[0] == 1:
return data[value_select][0]
else:
return data.reindex(vdims=[value_select])
Copy path View file
@@ -0,0 +1,241 @@
from __future__ import absolute_import

try:
import itertools.izip as zip
except ImportError:
pass

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.dataframe import DataFrame
from dask.dataframe.core import Scalar

from .. import util
from ..element import Element
from ..ndmapping import NdMapping, item_check
from .interface import Interface
from .pandas import PandasInterface


class DaskInterface(PandasInterface):
"""
The DaskInterface allows a Dataset objects to wrap a dask
DataFrame object. Using dask allows loading data lazily
and performing out-of-core operations on the data, making
it possible to work on datasets larger than memory.
The DaskInterface covers almost the complete API exposed
by the PandasInterface with two notable exceptions:
1) Sorting is not supported and any attempt at sorting will
be ignored with an warning.
2) Dask does not easily support adding a new column to an existing
dataframe unless it is a scalar, add_dimension will therefore
error when supplied a non-scalar value.
4) Not all functions can be easily applied to a dask dataframe so
some functions applied with aggregate and reduce will not work.
"""

types = (DataFrame,)

datatype = 'dask'

default_partitions = 100

@classmethod
def init(cls, eltype, data, kdims, vdims):
data, kdims, vdims = PandasInterface.init(eltype, data, kdims, vdims)
if not isinstance(data, DataFrame):
data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False)
return data, kdims, vdims

@classmethod
def shape(cls, dataset):
return (len(dataset.data), len(dataset.data.columns))

@classmethod
def range(cls, columns, dimension):
column = columns.data[columns.get_dimension(dimension).name]
if column.dtype.kind == 'O':
column = np.sort(column[column.notnull()].compute())
return column[0], column[-1]
else:
return dd.compute(column.min(), column.max())

@classmethod
def sort(cls, columns, by=[]):
columns.warning('Dask dataframes do not support sorting')
return columns.data

@classmethod
def values(cls, columns, dim, expanded=True, flat=True):
data = columns.data[dim]
if not expanded:
data = data.unique()
return data.compute().values

@classmethod
def select_mask(cls, dataset, selection):
"""
Given a Dataset object and a dictionary with dimension keys and
selection keys (i.e tuple ranges, slices, sets, lists or literals)
return a boolean mask over the rows in the Dataset object that
have been selected.
"""
select_mask = None
for dim, k in selection.items():
if isinstance(k, tuple):
k = slice(*k)
masks = []
series = dataset.data[dim]
if isinstance(k, slice):
if k.start is not None:
masks.append(k.start <= series)
if k.stop is not None:
masks.append(series < k.stop)
elif isinstance(k, (set, list)):
iter_slc = None
for ik in k:
mask = series == ik
if iter_slc is None:
iter_slc = mask
else:
iter_slc |= mask
masks.append(iter_slc)
elif callable(k):
masks.append(k(series))
else:
masks.append(series == k)
for mask in masks:
if select_mask:
select_mask &= mask
else:
select_mask = mask
return select_mask

@classmethod
def select(cls, columns, selection_mask=None, **selection):
df = columns.data
if selection_mask is not None:
return df[selection_mask]
selection_mask = cls.select_mask(columns, selection)
indexed = cls.indexed(columns, selection)
df = df if selection_mask is None else df[selection_mask]
if indexed and len(df) == 1:
return df[columns.vdims[0].name].compute().iloc[0]
return df

@classmethod
def groupby(cls, columns, dimensions, container_type, group_type, **kwargs):
index_dims = [columns.get_dimension(d) for d in dimensions]
element_dims = [kdim for kdim in columns.kdims
if kdim not in index_dims]

group_kwargs = {}
if group_type != 'raw' and issubclass(group_type, Element):
group_kwargs = dict(util.get_param_values(columns),
kdims=element_dims)
group_kwargs.update(kwargs)

data = []
groupby = columns.data.groupby(dimensions)
ind_array = columns.data[dimensions].compute().values
indices = (tuple(ind) for ind in ind_array)
for coord in util.unique_iterator(indices):
if any(isinstance(c, float) and np.isnan(c) for c in coord):
continue
if len(coord) == 1:
coord = coord[0]
group = group_type(groupby.get_group(coord), **group_kwargs)
data.append((coord, group))
if issubclass(container_type, NdMapping):
with item_check(False):
return container_type(data, kdims=index_dims)
else:
return container_type(data)

@classmethod
def aggregate(cls, columns, dimensions, function, **kwargs):
data = columns.data
cols = [d.name for d in columns.kdims if d in dimensions]
vdims = columns.dimensions('value', True)
dtypes = data.dtypes
numeric = [c for c, dtype in zip(dtypes.index, dtypes.values)
if dtype.kind in 'iufc' and c in vdims]
reindexed = data[cols+numeric]

inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean',
'std': 'std', 'sum': 'sum', 'var': 'var'}
if len(dimensions):
groups = reindexed.groupby(cols, sort=False)
if (function.__name__ in inbuilts):
agg = getattr(groups, inbuilts[function.__name__])()
else:
agg = groups.apply(function)
return agg.reset_index()
else:
if (function.__name__ in inbuilts):
agg = getattr(reindexed, inbuilts[function.__name__])()
else:
raise NotImplementedError
return pd.DataFrame(agg.compute()).T

@classmethod
def unpack_scalar(cls, columns, data):
"""
Given a columns object and data in the appropriate format for
the interface, return a simple scalar.
"""
if len(data.columns) > 1 or len(data) != 1:
return data
if isinstance(data, dd.DataFrame):
data = data.compute()
return data.iat[0,0]

@classmethod
def sample(cls, columns, samples=[]):
data = columns.data
dims = columns.dimensions('key', label=True)
mask = None
for sample in samples:
if np.isscalar(sample): sample = [sample]
for i, (c, v) in enumerate(zip(dims, sample)):
dim_mask = data[c]==v
if mask is None:
mask = dim_mask
else:
mask |= dim_mask
return data[mask]

@classmethod
def add_dimension(cls, columns, dimension, dim_pos, values, vdim):
data = columns.data
if dimension.name not in data.columns:
if not np.isscalar(values):
err = ('Dask dataframe does not support assigning '
'non-scalar value.')
raise NotImplementedError(err)
data = data.assign(**{dimension.name: values})
return data

@classmethod
def concat(cls, columns_objs):
cast_objs = cls.cast(columns_objs)
return dd.concat([col.data for col in cast_objs])

@classmethod
def dframe(cls, columns, dimensions):
return columns.data.compute()

@classmethod
def length(cls, dataset):
"""
Length of dask dataframe is unknown, always return 1
for performance, use shape to compute dataframe shape.
"""
return 1



Interface.register(DaskInterface)
Copy path View file
@@ -79,6 +79,16 @@ def init(cls, eltype, data, kdims, vdims):
return data, {'kdims':kdims, 'vdims':vdims}, {}


@classmethod
def validate(cls, dataset):
not_found = [d for d in dataset.dimensions(label=True)
if d not in dataset.data.columns]
if not_found:
raise ValueError("Supplied data does not contain specified "
"dimensions, the following dimensions were "
"not found: %s" % repr(not_found))


@classmethod
def range(cls, columns, dimension):
column = columns.data[columns.get_dimension(dimension).name]
@@ -125,9 +135,10 @@ def aggregate(cls, columns, dimensions, function, **kwargs):
data = columns.data
cols = [d.name for d in columns.kdims if d in dimensions]
vdims = columns.dimensions('value', True)
reindexed = data.reindex(columns=cols+vdims)
reindexed = data[cols+vdims]
if len(dimensions):
return reindexed.groupby(cols, sort=False).aggregate(function, **kwargs).reset_index()
grouped = reindexed.groupby(cols, sort=False)
return grouped.aggregate(function, **kwargs).reset_index()
else:
agg = reindexed.apply(function, **kwargs)
return pd.DataFrame.from_items([(col, [v]) for col, v in
@@ -185,11 +196,9 @@ def select(cls, columns, selection_mask=None, **selection):
@classmethod
def values(cls, columns, dim, expanded=True, flat=True):
data = columns.data[dim]
if util.dd and isinstance(data, util.dd.Series):
data = data.compute()
if not expanded:
return util.unique_array(data)
return np.array(data)
return data.unique()
return data.values


@classmethod
@@ -447,7 +447,7 @@ def compare_bounds(cls, el1, el2, msg='Bounds'):
@classmethod
def compare_dataset(cls, el1, el2, msg='Dataset'):
cls.compare_dimensioned(el1, el2)
if len(el1) != len(el2):
if el1.shape[0] != el2.shape[0]:
raise AssertionError("%s not of matching length." % msg)
dimension_data = [(d, el1[d], el2[d]) for d in el1.dimensions()]
for dim, d1, d2 in dimension_data:
@@ -8,27 +8,30 @@
import xarray as xr
import datashader as ds
import datashader.transfer_functions as tf
import dask.dataframe as dd

from datashader.core import bypixel
from datashader.pandas import pandas_pipeline
from datashader.dask import dask_pipeline
from datashape.dispatch import dispatch
from datashape import discover as dsdiscover

from ..core import (ElementOperation, Element, Dimension, NdOverlay,
Overlay, CompositeOverlay, Dataset)
from ..core.data import ArrayInterface, PandasInterface
from ..core.data import ArrayInterface, PandasInterface, DaskInterface
from ..core.util import get_param_values, basestring
from ..element import GridImage, Path, Curve, Contours, RGB
from ..streams import RangeXY

DF_INTERFACES = [PandasInterface, DaskInterface]

@dispatch(Element)
def discover(dataset):
"""
Allows datashader to correctly discover the dtypes of the data
in a holoviews Element.
"""
if dataset.interface in [PandasInterface, ArrayInterface]:
if dataset.interface in DF_INTERFACES:
return dsdiscover(dataset.data)
else:
return dsdiscover(dataset.dframe())
@@ -54,8 +57,13 @@ def dataset_pipeline(dataset, schema, canvas, glyph, summary):
vdims = [dataset.get_dimension(column)(name) if column
else Dimension('Count')]

agg = pandas_pipeline(dataset.data, schema, canvas,
glyph, summary)
if dataset.interface is PandasInterface:
agg = pandas_pipeline(dataset.data, schema, canvas,
glyph, summary)
elif dataset.interface is DaskInterface:
agg = dask_pipeline(dataset.data, schema, canvas,
glyph, summary)

agg = agg.rename({'x_axis': kdims[0].name,
'y_axis': kdims[1].name})
return agg
@@ -125,7 +133,7 @@ def get_agg_data(cls, obj, category=None):
kdims = obj.kdims
vdims = obj.vdims
x, y = obj.dimensions(label=True)[:2]
is_df = lambda x: isinstance(x, Dataset) and x.interface is PandasInterface
is_df = lambda x: isinstance(x, Dataset) and x.interface in DF_INTERFACES
if isinstance(obj, Path):
glyph = 'line'
for p in obj.data:
@@ -145,12 +153,19 @@ def get_agg_data(cls, obj, category=None):
elif isinstance(obj, Element):
glyph = 'line' if isinstance(obj, Curve) else 'points'
paths.append(obj.data if is_df(obj) else obj.dframe())
if glyph == 'line':
empty = paths[0][:1].copy()
empty.loc[0, :] = (np.NaN,) * empty.shape[1]
paths = [elem for path in paths for elem in (path, empty)][:-1]
if len(paths) > 1:
df = pd.concat(paths).reset_index(drop=True)
if glyph == 'line':
path = paths[0][:1]
if isinstance(path, dd.DataFrame):
path = path.compute()
empty = path.copy()
empty.loc[0, :] = (np.NaN,) * empty.shape[1]
paths = [elem for path in paths for elem in (path, empty)][:-1]
datasets = [Dataset(p) for p in paths]
if isinstance(paths[0], dd.DataFrame):

This comment has been minimized.

Copy link
@jlstevens

jlstevens Nov 15, 2016

Contributor

isinstance checks over data formats is just the sort of thing interfaces are supposed to handle for you. I am hoping we can get rid of these isinstance checks, perhaps by using the appropriate utility to select the right interface based on the data type (i.e whatever dataframe type it happens to be)?

df = DaskInterface.concat(datasets)
else:
df = PandasInterface.concat(datasets)
else:
df = paths[0]
if category and df[category].dtype.name != 'category':
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.