Skip to content

Commit

Permalink
support link_selections with Dask DataFrames (#4044)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease authored and philippjfr committed Oct 13, 2019
1 parent b5088cc commit 1f46bb6
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 74 deletions.
239 changes: 177 additions & 62 deletions holoviews/tests/util/testtransform.py
Expand Up @@ -5,6 +5,9 @@
from __future__ import division

import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da

from holoviews.core.data import Dataset
from holoviews.element.comparison import ComparisonTestCase
Expand All @@ -14,156 +17,268 @@
class TestDimTransforms(ComparisonTestCase):

def setUp(self):
self.linear_ints = np.arange(1, 11)
self.linear_floats = np.arange(1, 11)/10.
self.negative = -self.linear_floats
self.repeating = ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A']
self.linear_ints = pd.Series(np.arange(1, 11))
self.linear_floats = pd.Series(np.arange(1, 11)/10.)
self.negative = pd.Series(-self.linear_floats)
self.repeating = pd.Series(
['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A']
)
self.dataset = Dataset(
(self.linear_ints, self.linear_floats, self.negative, self.repeating),
['int', 'float', 'negative', 'categories']
)

ddf = dd.from_pandas(self.dataset.data, npartitions=2)
self.dataset_dask = self.dataset.clone(data=ddf)

# Assertion helpers

def check_apply(self, expr, expected, skip_dask=False):
if np.isscalar(expected):
# Pandas input
self.assertEqual(
expr.apply(self.dataset, keep_index=False), expected
)
self.assertEqual(
expr.apply(self.dataset, keep_index=True), expected
)

# Dask input
self.assertEqual(
expr.apply(self.dataset_dask, keep_index=False), expected
)
self.assertEqual(
expr.apply(self.dataset_dask, keep_index=True), expected
)
return

# Make sure expected is a pandas Series
self.assertIsInstance(expected, pd.Series)

# Check using dataset backed by pandas DataFrame
# keep_index=False
np.testing.assert_equal(
expr.apply(self.dataset),
expected.values
)
# keep_index=True
pd.testing.assert_series_equal(
expr.apply(self.dataset, keep_index=True),
expected,
check_names=False
)

if skip_dask:
return

# Check using dataset backed by Dask DataFrame
expected_dask = dd.from_pandas(expected, npartitions=2)

# keep_index=False, compute=False
da.assert_eq(
expr.apply(self.dataset_dask, compute=False), expected_dask.values
)
# keep_index=True, compute=False
dd.assert_eq(
expr.apply(self.dataset_dask, keep_index=True, compute=False),
expected_dask,
check_names=False
)
# keep_index=False, compute=True
np.testing.assert_equal(
expr.apply(self.dataset_dask, compute=True),
expected_dask.values.compute()
)
# keep_index=True, compute=True
pd.testing.assert_series_equal(
expr.apply(self.dataset_dask, keep_index=True, compute=True),
expected_dask.compute(),
check_names=False
)

# Unary operators

def test_abs_transform(self):
self.assertEqual(abs(dim('negative')).apply(self.dataset), self.linear_floats)
expr = abs(dim('negative'))
self.check_apply(expr, self.linear_floats)

def test_neg_transform(self):
self.assertEqual(-dim('negative').apply(self.dataset), self.linear_floats)
expr = -dim('negative')
self.check_apply(expr, self.linear_floats)

# Binary operators

def test_add_transform(self):
self.assertEqual((dim('float')+1).apply(self.dataset), self.linear_floats+1)
expr = dim('float') + 1
self.check_apply(expr, self.linear_floats+1)

def test_div_transform(self):
self.assertEqual((dim('int')/10.).apply(self.dataset), self.linear_floats)
expr = dim('int') / 10.
self.check_apply(expr, self.linear_floats)

def test_floor_div_transform(self):
self.assertEqual((dim('int')//2).apply(self.dataset), self.linear_ints//2)
expr = dim('int') // 2
self.check_apply(expr, self.linear_ints//2)

def test_mod_transform(self):
self.assertEqual((dim('int')%2).apply(self.dataset), self.linear_ints%2)
expr = dim('int') % 2
self.check_apply(expr, self.linear_ints % 2)

def test_mul_transform(self):
self.assertEqual((dim('float')*10.).apply(self.dataset), self.linear_ints)
expr = dim('float') * 10.
self.check_apply(expr, self.linear_ints.astype('float64'))

def test_pow_transform(self):
self.assertEqual((dim('int')**2).apply(self.dataset), self.linear_ints**2)
expr = dim('int') ** 2
self.check_apply(expr, self.linear_ints ** 2)

def test_sub_transform(self):
self.assertEqual((dim('int')-10).apply(self.dataset), self.linear_ints-10)
expr = dim('int') - 10
self.check_apply(expr, self.linear_ints - 10)

# Reverse binary operators

def test_radd_transform(self):
self.assertEqual((1+dim('float')).apply(self.dataset), 1+self.linear_floats)
expr = 1 + dim('float')
self.check_apply(expr, 1 + self.linear_floats)

def test_rdiv_transform(self):
self.assertEqual((10./dim('int')).apply(self.dataset), 10./self.linear_ints)
expr = 10. / dim('int')
self.check_apply(expr, 10. / self.linear_ints)

def test_rfloor_div_transform(self):
self.assertEqual((2//dim('int')).apply(self.dataset), 2//self.linear_ints)
expr = 2 // dim('int')
self.check_apply(expr, 2 // self.linear_ints)

def test_rmod_transform(self):
self.assertEqual((2%dim('int')).apply(self.dataset), 2%self.linear_ints)
expr = 2 % dim('int')
self.check_apply(expr, 2 % self.linear_ints)

def test_rmul_transform(self):
self.assertEqual((10.*dim('float')).apply(self.dataset), self.linear_ints)
expr = 10. * dim('float')
self.check_apply(expr, self.linear_ints.astype('float64'))

def test_rsub_transform(self):
self.assertEqual((10-dim('int')).apply(self.dataset), 10-self.linear_ints)
expr = 10 - dim('int')
self.check_apply(expr, 10 - self.linear_ints)

# NumPy operations

def test_ufunc_transform(self):
self.assertEqual(np.sin(dim('float')).apply(self.dataset), np.sin(self.linear_floats))
expr = np.sin(dim('float'))
self.check_apply(expr, np.sin(self.linear_floats))

def test_astype_transform(self):
self.assertEqual(dim('int').astype(str).apply(self.dataset),
self.linear_ints.astype(str))
expr = dim('int').astype('float64')
self.check_apply(expr, self.linear_ints.astype('float64'))

def test_cumsum_transform(self):
self.assertEqual(dim('float').cumsum().apply(self.dataset),
self.linear_floats.cumsum())
expr = dim('float').cumsum()
self.check_apply(expr, self.linear_floats.cumsum())

def test_max_transform(self):
self.assertEqual(dim('float').max().apply(self.dataset),
self.linear_floats.max())
expr = dim('float').max()
self.check_apply(expr, self.linear_floats.max())

def test_min_transform(self):
self.assertEqual(dim('float').min().apply(self.dataset),
self.linear_floats.min())
expr = dim('float').min()
self.check_apply(expr, self.linear_floats.min())

def test_round_transform(self):
self.assertEqual(dim('float').round().apply(self.dataset),
self.linear_floats.round())
expr = dim('float').round()
self.check_apply(expr, self.linear_floats.round())

def test_sum_transform(self):
self.assertEqual(dim('float').sum().apply(self.dataset),
self.linear_floats.sum())
expr = dim('float').sum()
self.check_apply(expr, self.linear_floats.sum())

def test_std_transform(self):
self.assertEqual(dim('float').std().apply(self.dataset),
self.linear_floats.std())
expr = dim('float').std()
self.check_apply(expr, self.linear_floats.std(ddof=0))

def test_var_transform(self):
self.assertEqual(dim('float').var().apply(self.dataset),
self.linear_floats.var())
expr = dim('float').var()
self.check_apply(expr, self.linear_floats.var(ddof=0))

def test_log_transform(self):
self.assertEqual(dim('float').log().apply(self.dataset),
np.log(self.linear_floats))
expr = dim('float').log()
self.check_apply(expr, np.log(self.linear_floats))

def test_log10_transform(self):
self.assertEqual(dim('float').log10().apply(self.dataset),
np.log10(self.linear_floats))
expr = dim('float').log10()
self.check_apply(expr, np.log10(self.linear_floats))

# Custom functions

def test_norm_transform(self):
self.assertEqual(dim('int').norm().apply(self.dataset),
(self.linear_ints-1)/9.)
expr = dim('int').norm()
self.check_apply(expr, (self.linear_ints-1)/9.)

def test_bin_transform(self):
self.assertEqual(dim('int').bin([0, 5, 10]).apply(self.dataset),
np.array([2.5, 2.5, 2.5, 2.5, 2.5, 7.5, 7.5, 7.5, 7.5, 7.5]))
expr = dim('int').bin([0, 5, 10])
expected = pd.Series(
[2.5, 2.5, 2.5, 2.5, 2.5, 7.5, 7.5, 7.5, 7.5, 7.5]
)
self.check_apply(expr, expected)

def test_bin_transform_with_labels(self):
self.assertEqual(dim('int').bin([0, 5, 10], ['A', 'B']).apply(self.dataset),
np.array(['A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B']))
expr = dim('int').bin([0, 5, 10], ['A', 'B'])
expected = pd.Series(
['A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B']
)
self.check_apply(expr, expected)

def test_categorize_transform_list(self):
self.assertEqual(dim('categories').categorize(['circle', 'square', 'triangle']).apply(self.dataset),
np.array((['circle', 'square', 'triangle']*3)+['circle']))
expr = dim('categories').categorize(['circle', 'square', 'triangle'])
expected = pd.Series(
(['circle', 'square', 'triangle']*3)+['circle']
)
# We skip dask because results will depend on partition structure
self.check_apply(expr, expected, skip_dask=True)

def test_categorize_transform_dict(self):
self.assertEqual(dim('categories').categorize({'A': 'circle', 'B': 'square', 'C': 'triangle'}).apply(self.dataset),
np.array((['circle', 'square', 'triangle']*3)+['circle']))
expr = dim('categories').categorize(
{'A': 'circle', 'B': 'square', 'C': 'triangle'}
)
expected = pd.Series(
(['circle', 'square', 'triangle'] * 3) + ['circle']
)
# We don't skip dask because results are now stable across partitions
self.check_apply(expr, expected)

def test_categorize_transform_dict_with_default(self):
self.assertEqual(dim('categories').categorize({'A': 'circle', 'B': 'square'}, default='triangle').apply(self.dataset),
np.array((['circle', 'square', 'triangle']*3)+['circle']))
expr = dim('categories').categorize(
{'A': 'circle', 'B': 'square'}, default='triangle'
)
expected = pd.Series(
(['circle', 'square', 'triangle'] * 3) + ['circle']
)
# We don't skip dask because results are stable across partitions
self.check_apply(expr, expected)

# Numpy functions

def test_digitize(self):
self.assertEqual(dim('int').digitize([1, 5, 10]).apply(self.dataset),
np.array([1, 1, 1, 1, 2, 2, 2, 2, 2, 3]))
expr = dim('int').digitize([1, 5, 10])
expected = pd.Series(np.array([1, 1, 1, 1, 2, 2, 2, 2, 2, 3]))
self.check_apply(expr, expected)

def test_isin(self):
self.assertEqual(dim('int').digitize([1, 5, 10]).isin([1, 3]).apply(self.dataset),
np.array([1, 1, 1, 1, 0, 0, 0, 0, 0, 1], dtype='bool'))
expr = dim('int').digitize([1, 5, 10]).isin([1, 3])
expected = pd.Series(
np.array([1, 1, 1, 1, 0, 0, 0, 0, 0, 1], dtype='bool')
)
self.check_apply(expr, expected)

# Complex expressions

def test_multi_operator_expression(self):
self.assertEqual((((dim('float')-2)*3)**2).apply(self.dataset),
((self.linear_floats-2)*3)**2)
expr = (((dim('float')-2)*3)**2)
self.check_apply(expr, ((self.linear_floats-2)*3)**2)

def test_multi_dim_expression(self):
self.assertEqual((dim('int')-dim('float')).apply(self.dataset),
self.linear_ints-self.linear_floats)
expr = dim('int')-dim('float')
self.check_apply(expr, self.linear_ints-self.linear_floats)

# Repr method

Expand Down

0 comments on commit 1f46bb6

Please sign in to comment.