diff --git a/streamz/dataframe/tests/test_cudf.py b/streamz/dataframe/tests/test_cudf.py index 1cb89943..28efa49d 100644 --- a/streamz/dataframe/tests/test_cudf.py +++ b/streamz/dataframe/tests/test_cudf.py @@ -1,21 +1,25 @@ """ -Tests for cudf DataFrame -All these tests are taken from test_dataframes module in the same folder. -Some of these tests pass with cudf as they are, and others are marked xfail -where a pandas like method is not implemented yet in cudf. -But these tests should pass as cudf implement more pandas like methods. +Tests for cudf DataFrames: +All tests have been cloned from the test_dataframes module in the same folder. +Some of these tests pass with cudf, and others are marked with xfail +where a pandas-like method is not yet implemented in cudf. +But these tests should pass as and when cudf rolls out more pandas-like methods. """ from __future__ import division, print_function -import numpy as np -import pytest import operator + +import pytest from dask.dataframe.utils import assert_eq -from distributed import Client +import numpy as np +import pandas as pd from streamz import Stream -from streamz.dask import DaskStream from streamz.dataframe import DataFrame, Series, DataFrames, Aggregation +from streamz.dask import DaskStream + +from distributed import Client + cudf = pytest.importorskip("cudf") @@ -70,8 +74,8 @@ def test_attributes(): df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) sdf = DataFrame(example=df) - assert 'x' in dir(sdf) - assert 'z' not in dir(sdf) + assert getattr(sdf,'x',-1) != -1 + assert getattr(sdf,'z',-1) == -1 sdf.x with pytest.raises(AttributeError): @@ -359,6 +363,53 @@ def test_setitem_overwrites(stream): assert_eq(L[-1], df.iloc[7:] * 2) +@pytest.mark.parametrize('kwargs,op', [ + ({}, 'sum'), + ({}, 'mean'), + pytest.param({}, 'min'), + pytest.param({}, 'median', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), + pytest.param({}, 'max'), + pytest.param({}, 'var', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), + pytest.param({}, 'count'), + pytest.param({'ddof': 0}, 'std', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), + pytest.param({'quantile': 0.5}, 'quantile', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")), + pytest.param({'arg': {'A': 'sum', 'B': 'min'}}, 'aggregate', marks=pytest.mark.xfail(reason="Not implemented")) +]) +@pytest.mark.parametrize('window', [ + pytest.param(2), + 7, + pytest.param('3h'), + pd.Timedelta('200 minutes') +]) +@pytest.mark.parametrize('m', [ + 2, + pytest.param(5) +]) +@pytest.mark.parametrize('pre_get,post_get', [ + (lambda df: df, lambda df: df), + (lambda df: df.x, lambda x: x), + (lambda df: df, lambda df: df.x) +]) +def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, + stream): + index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h') + df = cudf.DataFrame({'x': np.arange(len(index))}, index=index) + + expected = getattr(post_get(pre_get(df).rolling(window)), op)(**kwargs) + + sdf = DataFrame(example=df, stream=stream) + roll = getattr(post_get(pre_get(sdf).rolling(window)), op)(**kwargs) + L = roll.stream.gather().sink_to_list() + assert len(L) == 0 + + for i in range(0, len(df), m): + sdf.emit(df.iloc[i: i + m]) + + assert len(L) > 1 + + assert_eq(cudf.concat(L), expected) + + def test_stream_to_dataframe(stream): df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) source = stream @@ -382,24 +433,6 @@ def test_to_frame(stream): assert list(a.columns) == ['x'] -def test_instantiate_with_dict(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - sdf2 = DataFrame({'a': sdf.x, 'b': sdf.x * 2, - 'c': sdf.y % 2}) - L = sdf2.stream.gather().sink_to_list() - assert len(sdf2.columns) == 3 - - sdf.emit(df) - sdf.emit(df) - - assert len(L) == 2 - for x in L: - assert_eq(x[['a', 'b', 'c']], - cudf.DataFrame({'a': df.x, 'b': df.x * 2, 'c': df.y % 2})) - - @pytest.mark.parametrize('op', ['cumsum', 'cummax', 'cumprod', 'cummin']) @pytest.mark.parametrize('getter', [lambda df: df, lambda df: df.x]) def test_cumulative_aggregations(op, getter, stream): diff --git a/streamz/dataframe/tests/test_dataframe_utils.py b/streamz/dataframe/tests/test_dataframe_utils.py index d730a1cf..617a5734 100644 --- a/streamz/dataframe/tests/test_dataframe_utils.py +++ b/streamz/dataframe/tests/test_dataframe_utils.py @@ -6,11 +6,6 @@ import numpy as np -def test_utils_is_dataframe_like(): - test_utils_dataframe = pytest.importorskip('dask.dataframe.tests.test_utils_dataframe') - test_utils_dataframe.test_is_dataframe_like() - - def test_utils_get_base_frame_type_pandas(): with pytest.raises(TypeError): get_base_frame_type("DataFrame", is_dataframe_like, None) diff --git a/streamz/dataframe/tests/test_dataframes.py b/streamz/dataframe/tests/test_dataframes.py index b08b4acf..a0294597 100644 --- a/streamz/dataframe/tests/test_dataframes.py +++ b/streamz/dataframe/tests/test_dataframes.py @@ -69,8 +69,8 @@ def test_attributes(): df = pd.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) sdf = DataFrame(example=df) - assert 'x' in dir(sdf) - assert 'z' not in dir(sdf) + assert getattr(sdf,'x',-1) != -1 + assert getattr(sdf,'z',-1) == -1 sdf.x with pytest.raises(AttributeError): diff --git a/streamz/tests/test_dask.py b/streamz/tests/test_dask.py index a53c4286..d4da4fc4 100644 --- a/streamz/tests/test_dask.py +++ b/streamz/tests/test_dask.py @@ -14,7 +14,7 @@ from distributed.utils_test import gen_cluster, inc, cluster, loop, slowinc # noqa: F401 -@gen_cluster(client=True, check_new_threads=False) +@gen_cluster(client=True) def test_map(c, s, a, b): source = Stream(asynchronous=True) futures = scatter(source).map(inc)