diff --git a/streamz/dataframe/aggregations.py b/streamz/dataframe/aggregations.py index 74c2baa2..4ed18f90 100644 --- a/streamz/dataframe/aggregations.py +++ b/streamz/dataframe/aggregations.py @@ -4,6 +4,7 @@ from numbers import Number import numpy as np +import pandas as pd from .utils import is_series_like, is_index_like, get_dataframe_package @@ -202,10 +203,10 @@ def diff_loc(dfs, new, window=None): """ dfs = deque(dfs) dfs.append(new) - mx = max(df.index.max() for df in dfs) + mx = pd.Timestamp(max(df.index.max() for df in dfs)) mn = mx - window old = [] - while dfs[0].index.min() < mn: + while pd.Timestamp(dfs[0].index.min()) < mn: o = dfs[0].loc[:mn] old.append(o) # TODO: avoid copy if fully lost dfs[0] = dfs[0].iloc[len(o):] @@ -347,8 +348,6 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro for o, og in zip(old, old_groupers): if 'groupers' in acc: assert len(o) == len(og) - if hasattr(og, 'index'): - assert (o.index == og.index).all() if len(o): state, result = agg.on_old(state, o, grouper=og) size_state, _ = size.on_old(size_state, o, grouper=og) @@ -407,11 +406,13 @@ class GroupbySum(GroupbyAggregation): def on_new(self, acc, new, grouper=None): g = self.grouped(new, grouper=grouper) result = acc.add(g.sum(), fill_value=0) + result.index.name = acc.index.name return result, result def on_old(self, acc, old, grouper=None): g = self.grouped(old, grouper=grouper) result = acc.sub(g.sum(), fill_value=0) + result.index.name = acc.index.name return result, result def initial(self, new, grouper=None): @@ -427,12 +428,14 @@ def on_new(self, acc, new, grouper=None): g = self.grouped(new, grouper=grouper) result = acc.add(g.count(), fill_value=0) result = result.astype(int) + result.index.name = acc.index.name return result, result def on_old(self, acc, old, grouper=None): g = self.grouped(old, grouper=grouper) result = acc.sub(g.count(), fill_value=0) result = result.astype(int) + result.index.name = acc.index.name return result, result def initial(self, new, grouper=None): @@ -448,12 +451,14 @@ def on_new(self, acc, new, grouper=None): g = self.grouped(new, grouper=grouper) result = acc.add(g.size(), fill_value=0) result = result.astype(int) + result.index.name = acc.index.name return result, result def on_old(self, acc, old, grouper=None): g = self.grouped(old, grouper=grouper) result = acc.sub(g.size(), fill_value=0) result = result.astype(int) + result.index.name = acc.index.name return result, result def initial(self, new, grouper=None): @@ -467,10 +472,12 @@ def initial(self, new, grouper=None): class ValueCounts(Aggregation): def on_new(self, acc, new, grouper=None): result = acc.add(new.value_counts(), fill_value=0).astype(int) + result.index.name = acc.index.name return result, result def on_old(self, acc, new, grouper=None): result = acc.sub(new.value_counts(), fill_value=0).astype(int) + result.index.name = acc.index.name return result, result def initial(self, new, grouper=None): @@ -483,7 +490,8 @@ def on_new(self, acc, new, grouper=None): g = self.grouped(new, grouper=grouper) totals = totals.add(g.sum(), fill_value=0) counts = counts.add(g.count(), fill_value=0) - + totals.index.name = acc[0].index.name + counts.index.name = acc[1].index.name return (totals, counts), totals / counts def on_old(self, acc, old, grouper=None): @@ -491,7 +499,8 @@ def on_old(self, acc, old, grouper=None): g = self.grouped(old, grouper=grouper) totals = totals.sub(g.sum(), fill_value=0) counts = counts.sub(g.count(), fill_value=0) - + totals.index.name = acc[0].index.name + counts.index.name = acc[1].index.name return (totals, counts), totals / counts def initial(self, new, grouper=None): diff --git a/streamz/dataframe/tests/test_cudf.py b/streamz/dataframe/tests/test_cudf_dataframes.py similarity index 50% rename from streamz/dataframe/tests/test_cudf.py rename to streamz/dataframe/tests/test_cudf_dataframes.py index 28efa49d..a557a848 100644 --- a/streamz/dataframe/tests/test_cudf.py +++ b/streamz/dataframe/tests/test_cudf_dataframes.py @@ -1,25 +1,24 @@ """ Tests for cudf DataFrames: -All tests have been cloned from the test_dataframes module in the same folder. -Some of these tests pass with cudf, and others are marked with xfail -where a pandas-like method is not yet implemented in cudf. -But these tests should pass as and when cudf rolls out more pandas-like methods. +All tests have been cloned from the test_dataframes module in the streamz +repository. Some of these tests pass with cudf, and others are marked with +xfail, where a pandas-like method is not yet implemented in cudf. But these +tests should pass as and when cudf rolls out more pandas-like methods. """ from __future__ import division, print_function +import json import operator -import pytest -from dask.dataframe.utils import assert_eq import numpy as np import pandas as pd +import pytest +from dask.dataframe.utils import assert_eq +from distributed import Client from streamz import Stream -from streamz.dataframe import DataFrame, Series, DataFrames, Aggregation from streamz.dask import DaskStream - -from distributed import Client - +from streamz.dataframe import Aggregation, DataFrame, DataFrames, Series cudf = pytest.importorskip("cudf") @@ -42,14 +41,14 @@ def stream(request, client): # flake8: noqa def test_identity(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) L = sdf.stream.gather().sink_to_list() sdf.emit(df) assert L[0] is df - assert list(sdf.example.columns) == ['x', 'y'] + assert list(sdf.example.columns) == ["x", "y"] x = sdf.x assert isinstance(x, Series) @@ -62,7 +61,7 @@ def test_identity(stream): def test_dtype(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) assert str(sdf.dtypes) == str(df.dtypes) @@ -71,11 +70,11 @@ def test_dtype(stream): def test_attributes(): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df) - assert getattr(sdf,'x',-1) != -1 - assert getattr(sdf,'z',-1) == -1 + assert getattr(sdf, "x", -1) != -1 + assert getattr(sdf, "z", -1) == -1 sdf.x with pytest.raises(AttributeError): @@ -83,7 +82,7 @@ def test_attributes(): def test_exceptions(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) with pytest.raises(TypeError): sdf.emit(1) @@ -93,15 +92,9 @@ def test_exceptions(stream): @pytest.mark.parametrize('func', [ - pytest.param(lambda x: x.sum(), - marks=pytest.mark.xfail( - reason="'Series' object does not support item assignment")), - pytest.param(lambda x: x.mean(), - marks=pytest.mark.xfail( - reason="'Series' object does not support item assignment")), - lambda x: x.count(), - pytest.param(lambda x: x.size, - marks=pytest.mark.xfail(reason="Not implemented")) + lambda x: x.sum(), + lambda x: x.mean(), + lambda x: x.count() ]) def test_reductions(stream, func): df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) @@ -120,29 +113,32 @@ def test_reductions(stream, func): assert_eq(x_out[-1], func(cudf.concat([df, df]).x)) -@pytest.mark.parametrize('op', [ - operator.add, - operator.and_, - operator.eq, - operator.floordiv, - operator.ge, - operator.gt, - operator.le, - operator.lshift, - operator.lt, - operator.mod, - operator.mul, - operator.ne, - operator.or_, - operator.pow, - operator.rshift, - operator.sub, - operator.truediv, - operator.xor, -]) -@pytest.mark.parametrize('getter', [lambda df: df, lambda df: df.x]) +@pytest.mark.parametrize( + "op", + [ + operator.add, + operator.and_, + operator.eq, + operator.floordiv, + operator.ge, + operator.gt, + operator.le, + operator.lshift, + operator.lt, + operator.mod, + operator.mul, + operator.ne, + operator.or_, + operator.pow, + operator.rshift, + operator.sub, + operator.truediv, + operator.xor, + ], +) +@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) def test_binary_operators(op, getter, stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) try: left = op(getter(df), 2) right = op(2, getter(df)) @@ -159,18 +155,21 @@ def test_binary_operators(op, getter, stream): assert_eq(r[0], right) -@pytest.mark.parametrize('op', [ - operator.abs, - operator.inv, - operator.invert, - operator.neg, - lambda x: x.map(lambda x: x + 1), - lambda x: x.reset_index(), - lambda x: x.astype(float), -]) -@pytest.mark.parametrize('getter', [lambda df: df, lambda df: df.x]) +@pytest.mark.parametrize( + "op", + [ + operator.abs, + operator.inv, + operator.invert, + operator.neg, + lambda x: x.map(lambda x: x + 1), + lambda x: x.reset_index(), + lambda x: x.astype(float), + ], +) +@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) def test_unary_operators(op, getter): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) try: expected = op(getter(df)) except Exception: @@ -184,13 +183,17 @@ def test_unary_operators(op, getter): assert_eq(b[0], expected) -@pytest.mark.parametrize('func', [ - lambda df: df.query('x > 1 and x < 4'), - pytest.param(lambda df: df.x.value_counts().nlargest(2), - marks=pytest.mark.xfail(reason="`cudf.Series.add` is'nt implemented")) -]) +@pytest.mark.parametrize( + "func", + [ + lambda df: df.query("x > 1 and x < 4"), + pytest.param(lambda df: df.x.value_counts().nlargest(2) + .astype(int), marks=pytest.mark.xfail( + reason="Index name lost in _getattr_")) + ], +) def test_dataframe_simple(func): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) expected = func(df) a = DataFrame(example=df) @@ -202,13 +205,13 @@ def test_dataframe_simple(func): def test_set_index(): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) a = DataFrame(example=df) - b = a.set_index('x').stream.sink_to_list() + b = a.set_index("x").stream.sink_to_list() a.emit(df) - assert_eq(b[0], df.set_index('x')) + assert_eq(b[0], df.set_index("x")) b = a.set_index(a.y + 1).stream.sink_to_list() a.emit(df) @@ -216,7 +219,7 @@ def test_set_index(): def test_binary_stream_operators(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) expected = df.x + df.y @@ -229,7 +232,7 @@ def test_binary_stream_operators(stream): def test_index(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) a = DataFrame(example=df, stream=stream) b = a.index + 5 L = b.stream.gather().sink_to_list() @@ -242,7 +245,7 @@ def test_index(stream): def test_pair_arithmetic(stream): - df = cudf.DataFrame({'x': list(range(10)), 'y': [1] * 10}) + df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) a = DataFrame(example=df.iloc[:0], stream=stream) L = ((a.x + a.y) * 2).stream.gather().sink_to_list() @@ -255,7 +258,7 @@ def test_pair_arithmetic(stream): def test_getitem(stream): - df = cudf.DataFrame({'x': list(range(10)), 'y': [1] * 10}) + df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) a = DataFrame(example=df.iloc[:0], stream=stream) L = a[a.x > 4].stream.gather().sink_to_list() @@ -267,29 +270,20 @@ def test_getitem(stream): assert_eq(cudf.concat(L), df[df.x > 4]) -@pytest.mark.xfail(reason="`cudf.DataFrame.add` is not implemented") @pytest.mark.parametrize('agg', [ lambda x: x.sum(), lambda x: x.mean(), - lambda x: x.count(), - lambda x: x.var(ddof=1), - lambda x: x.std(), - pytest.param(lambda x: x.var(ddof=0), - marks=pytest.mark.xfail(reason="unknown")) ]) @pytest.mark.parametrize('grouper', [lambda a: a.x % 3, lambda a: 'x', lambda a: a.index % 2, lambda a: ['x']]) -@pytest.mark.parametrize('indexer', [lambda g: g.y, - lambda g: g, +@pytest.mark.parametrize('indexer', [lambda g: g, lambda g: g[['y']], - pytest.param(lambda g: g[['x', 'y']], - marks=pytest.mark.xfail( - reason="Indexer column matches grouper")) - ]) + lambda g: g[['x', 'y']]]) def test_groupby_aggregate(agg, grouper, indexer, stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), 'y': [1.0, 2.0] * 5}) + df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), + 'y': [1.0, 2.0] * 5}) a = DataFrame(example=df.iloc[:0], stream=stream) @@ -303,18 +297,25 @@ def f(x): a.emit(df.iloc[7:]) first = df.iloc[:3] - assert assert_eq(L[0], f(first)) - assert assert_eq(L[-1], f(df)) + g = f(first) + + h = f(df) + + assert assert_eq(L[0], g) + assert assert_eq(L[-1], h) -@pytest.mark.xfail(reason="`cudf.Series.add` is not implemented") +@pytest.mark.xfail( + reason="AttributeError: 'StringColumn' object" + "has no attribute 'value_counts'" +) def test_value_counts(stream): - s = cudf.Series([1, 2, 1]) + s = cudf.Series(["a", "b", "a"]) a = Series(example=s, stream=stream) b = a.value_counts() - assert b._stream_type == 'updating' + assert b._stream_type == "updating" result = b.stream.gather().sink_to_list() a.emit(s) @@ -323,16 +324,44 @@ def test_value_counts(stream): assert_eq(result[-1], cudf.concat([s, s]).value_counts()) -@pytest.mark.xfail(reason="'Series' object does not support item assignment") +def test_repr(stream): + df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), + 'y': [1.0] * 10}) + a = DataFrame(example=df, stream=stream) + + text = repr(a) + assert type(a).__name__ in text + assert 'x' in text + assert 'y' in text + + text = repr(a.x) + assert type(a.x).__name__ in text + assert 'x' in text + + text = repr(a.x.sum()) + assert type(a.x.sum()).__name__ in text + + +def test_repr_html(stream): + df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), + 'y': [1.0] * 10}) + a = DataFrame(example=df, stream=stream) + + for x in [a, a.y, a.y.mean()]: + html = x._repr_html_() + assert type(x).__name__ in html + assert '1' in html + + def test_setitem(stream): - df = cudf.DataFrame({'x': list(range(10)), 'y': [1] * 10}) + df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) sdf = DataFrame(example=df.iloc[:0], stream=stream) stream = sdf.stream - sdf['z'] = sdf['x'] * 2 - sdf['a'] = 10 - sdf[['c', 'd']] = sdf[['x', 'y']] + sdf["z"] = sdf["x"] * 2 + sdf["a"] = 10 + sdf[["c", "d"]] = sdf[["x", "y"]] L = sdf.mean().stream.gather().sink_to_list() @@ -340,19 +369,20 @@ def test_setitem(stream): stream.emit(df.iloc[3:7]) stream.emit(df.iloc[7:]) - df['z'] = df['x'] * 2 - df['a'] = 10 - df[['c', 'd']] = df[['x', 'y']] + df["z"] = df["x"] * 2 + df["a"] = 10 + df["c"] = df["x"] + df["d"] = df["y"] assert_eq(L[-1], df.mean()) def test_setitem_overwrites(stream): - df = cudf.DataFrame({'x': list(range(10))}) + df = cudf.DataFrame({"x": list(range(10))}) sdf = DataFrame(example=df.iloc[:0], stream=stream) stream = sdf.stream - sdf['x'] = sdf['x'] * 2 + sdf["x"] = sdf["x"] * 2 L = sdf.stream.gather().sink_to_list() @@ -363,37 +393,58 @@ def test_setitem_overwrites(stream): assert_eq(L[-1], df.iloc[7:] * 2) -@pytest.mark.parametrize('kwargs,op', [ - ({}, 'sum'), - ({}, 'mean'), - pytest.param({}, 'min'), - pytest.param({}, 'median', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), - pytest.param({}, 'max'), - pytest.param({}, 'var', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), - pytest.param({}, 'count'), - pytest.param({'ddof': 0}, 'std', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), - pytest.param({'quantile': 0.5}, 'quantile', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), - pytest.param({'arg': {'A': 'sum', 'B': 'min'}}, 'aggregate', marks=pytest.mark.xfail(reason="Not implemented")) -]) -@pytest.mark.parametrize('window', [ - pytest.param(2), - 7, - pytest.param('3h'), - pd.Timedelta('200 minutes') -]) -@pytest.mark.parametrize('m', [ - 2, - pytest.param(5) -]) -@pytest.mark.parametrize('pre_get,post_get', [ - (lambda df: df, lambda df: df), - (lambda df: df.x, lambda x: x), - (lambda df: df, lambda df: df.x) -]) +@pytest.mark.parametrize( + "kwargs,op", + [ + ({}, "sum"), + ({}, "mean"), + pytest.param({}, "min"), + pytest.param( + {}, + "median", + marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), + ), + pytest.param({}, "max"), + pytest.param( + {}, + "var", + marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), + ), + pytest.param({}, "count"), + pytest.param( + {"ddof": 0}, + "std", + marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), + ), + pytest.param( + {"quantile": 0.5}, + "quantile", + marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), + ), + pytest.param( + {"arg": {"A": "sum", "B": "min"}}, + "aggregate", + marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), + ), + ], +) +@pytest.mark.parametrize( + "window", [pytest.param(2), 7, pytest.param("3h"), + pd.Timedelta("200 minutes")] +) +@pytest.mark.parametrize("m", [2, pytest.param(5)]) +@pytest.mark.parametrize( + "pre_get,post_get", + [ + (lambda df: df, lambda df: df), + (lambda df: df.x, lambda x: x), + (lambda df: df, lambda df: df.x), + ], +) def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, - stream): - index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h') - df = cudf.DataFrame({'x': np.arange(len(index))}, index=index) + stream): + index = pd.DatetimeIndex(start="2000-01-01", end="2000-01-03", freq="1h") + df = cudf.DataFrame({"x": np.arange(len(index))}, index=index) expected = getattr(post_get(pre_get(df).rolling(window)), op)(**kwargs) @@ -403,7 +454,7 @@ def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, assert len(L) == 0 for i in range(0, len(df), m): - sdf.emit(df.iloc[i: i + m]) + sdf.emit(df.iloc[i:i + m]) assert len(L) > 1 @@ -411,7 +462,7 @@ def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, def test_stream_to_dataframe(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) source = stream L = source.to_dataframe(example=df).x.sum().stream.gather().sink_to_list() @@ -422,21 +473,38 @@ def test_stream_to_dataframe(stream): assert L == [6, 12, 18] +def test_integration_from_stream(stream): + source = stream + sdf = ( + source.partition(4) + .to_batch(example=['{"x": 0, "y": 0}']) + .map(json.loads) + .to_dataframe() + ) + result = sdf.groupby(sdf.x).y.sum().mean() + L = result.stream.gather().sink_to_list() + + for i in range(12): + source.emit(json.dumps({"x": i % 3, "y": i})) + + assert L == [2, 28 / 3, 22.0] + + def test_to_frame(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) assert sdf.to_frame() is sdf a = sdf.x.to_frame() assert isinstance(a, DataFrame) - assert list(a.columns) == ['x'] + assert list(a.columns) == ["x"] -@pytest.mark.parametrize('op', ['cumsum', 'cummax', 'cumprod', 'cummin']) -@pytest.mark.parametrize('getter', [lambda df: df, lambda df: df.x]) +@pytest.mark.parametrize("op", ["cumsum", "cummax", "cumprod", "cummin"]) +@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) def test_cumulative_aggregations(op, getter, stream): - df = cudf.DataFrame({'x': list(range(10)), 'y': [1] * 10}) + df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) expected = getattr(getter(df), op)() sdf = DataFrame(example=df, stream=stream) @@ -444,7 +512,7 @@ def test_cumulative_aggregations(op, getter, stream): L = getattr(getter(sdf), op)().stream.gather().sink_to_list() for i in range(0, 10, 3): - sdf.emit(df.iloc[i: i + 3]) + sdf.emit(df.iloc[i:i + 3]) sdf.emit(df.iloc[:0]) assert len(L) > 1 @@ -453,10 +521,10 @@ def test_cumulative_aggregations(op, getter, stream): def test_display(stream): - pytest.importorskip('ipywidgets') - pytest.importorskip('IPython') + pytest.importorskip("ipywidgets") + pytest.importorskip("IPython") - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) s = sdf.x.sum() @@ -465,7 +533,7 @@ def test_display(stream): def test_tail(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) L = sdf.tail(2).stream.gather().sink_to_list() @@ -481,12 +549,12 @@ def test_example_type_error_message(): try: DataFrame(example=[123]) except Exception as e: - assert 'DataFrame' in str(e) - assert '[123]' in str(e) + assert "DataFrame" in str(e) + assert "[123]" in str(e) def test_dataframes(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrames(example=df, stream=stream) L = sdf.x.sum().stream.gather().sink_to_list() @@ -496,16 +564,17 @@ def test_dataframes(stream): assert L == [6, 6] -def test_aggregate_updating(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) +def test_groupby_aggregate_updating(stream): + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) - assert sdf.x.sum()._stream_type == 'updating' - assert (sdf.x.sum() + 1)._stream_type == 'updating' + assert sdf.groupby("x").y.mean()._stream_type == "updating" + assert sdf.x.sum()._stream_type == "updating" + assert (sdf.x.sum() + 1)._stream_type == "updating" def test_window_sum(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) + df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) L = sdf.window(n=4).x.sum().stream.gather().sink_to_list() @@ -517,7 +586,6 @@ def test_window_sum(stream): assert L == [6, 9, 9] -@pytest.mark.xfail(reason="'Series' object does not support item assignment") def test_window_sum_dataframe(stream): df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) sdf = DataFrame(example=df, stream=stream) @@ -534,36 +602,127 @@ def test_window_sum_dataframe(stream): assert_eq(L[2], cudf.Series([9, 21], index=['x', 'y'])) +@pytest.mark.parametrize( + "func", + [ + lambda x: x.sum(), + lambda x: x.mean(), + lambda x: x.count(), + lambda x: x.var(ddof=1), + lambda x: x.std(ddof=1), + lambda x: x.var(ddof=0), + ], +) +@pytest.mark.parametrize("n", [2, 4]) +@pytest.mark.parametrize("getter", [lambda df: df.x]) +def test_windowing_n(func, n, getter): + df = cudf.DataFrame({"x": list(range(10)), "y": [1, 2] * 5}) + + sdf = DataFrame(example=df) + L = func(getter(sdf).window(n=n)).stream.gather().sink_to_list() + + for i in range(0, 10, 3): + sdf.emit(df.iloc[i:i + 3]) + sdf.emit(df.iloc[:0]) + + assert len(L) == 5 + + assert_eq(L[0], func(getter(df).iloc[max(0, 3 - n):3])) + assert_eq(L[-1], func(getter(df).iloc[len(df) - n:])) + + @pytest.mark.parametrize('func', [ lambda x: x.sum(), lambda x: x.mean(), - lambda x: x.count(), - lambda x: x.var(ddof=1), - lambda x: x.std(ddof=1), - lambda x: x.var(ddof=0), ]) -@pytest.mark.parametrize('n', [2, 4]) +@pytest.mark.parametrize('value', ['10h', '1d']) @pytest.mark.parametrize('getter', [ + lambda df: df, lambda df: df.x, ]) -def test_windowing_n(func, n, getter): - df = cudf.DataFrame({'x': list(range(10)), 'y': [1, 2] * 5}) +@pytest.mark.parametrize('grouper', [lambda a: 'y', + lambda a: a.index, + lambda a: ['y']]) +@pytest.mark.parametrize('indexer', [lambda g: g, + lambda g: g[['x']], + lambda g: g[['x', 'y']]]) +def test_groupby_windowing_value(func, value, getter, grouper, indexer): + index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h') + df = cudf.DataFrame({'x': np.arange(len(index), dtype=float), + 'y': np.arange(len(index), dtype=float) % 2}, + index=index) + + value = pd.Timedelta(value) sdf = DataFrame(example=df) - L = func(getter(sdf).window(n=n)).stream.gather().sink_to_list() - for i in range(0, 10, 3): - sdf.emit(df.iloc[i: i + 3]) + def f(x): + return func(indexer(x.groupby(grouper(x)))) + + L = f(sdf.window(value)).stream.gather().sink_to_list() + + diff = 13 + for i in range(0, len(index), diff): + sdf.emit(df.iloc[i: i + diff]) + + assert len(L) == 4 + + first = df.iloc[:diff] + lost = first.loc[first.index.min() + value:] + first = first.iloc[len(lost):] + + g = f(first) + assert_eq(L[0], g) + + last = df.loc[index.max() - value + pd.Timedelta('1s'):] + h = f(last) + assert_eq(L[-1], h) + + +@pytest.mark.parametrize('func', [ + lambda x: x.sum(), + lambda x: x.mean(), +]) +@pytest.mark.parametrize('n', [1, 4]) +@pytest.mark.parametrize('getter', [ + lambda df: df, + lambda df: df.x, +]) +@pytest.mark.parametrize('grouper', [lambda a: a.x % 3, + lambda a: 'y', + lambda a: a.index % 2, + lambda a: ['y']]) +@pytest.mark.parametrize('indexer', [lambda g: g, + lambda g: g[['x', 'y']]]) +def test_groupby_windowing_n(func, n, getter, grouper, indexer): + df = cudf.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) + + sdf = DataFrame(example=df) + + def f(x): + return func(indexer(x.groupby(grouper(x)))) + + L = f(sdf.window(n=n)).stream.gather().sink_to_list() + + diff = 3 + for i in range(0, 10, diff): + sdf.emit(df.iloc[i: i + diff]) sdf.emit(df.iloc[:0]) assert len(L) == 5 - assert_eq(L[0], func(getter(df).iloc[max(0, 3 - n): 3])) - assert_eq(L[-1], func(getter(df).iloc[len(df) - n:])) + first = df.iloc[max(0, diff - n): diff] + + g = f(first) + assert_eq(L[0], g) + + last = df.iloc[len(df) - n:] + h = f(last) + assert_eq(L[-1], h) def test_window_full(): - df = cudf.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) + df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) sdf = DataFrame(example=df) @@ -579,7 +738,7 @@ def test_window_full(): def test_custom_aggregation(): - df = cudf.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) + df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) class Custom(Aggregation): def initial(self, new):