Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions streamz/dataframe/tests/test_cudf.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
"""
Tests for cudf DataFrame
All these tests are taken from test_dataframes module in the same folder.
Some of these tests pass with cudf as they are, and others are marked xfail
where a pandas like method is not implemented yet in cudf.
But these tests should pass as cudf implement more pandas like methods.
Tests for cudf DataFrames:
All tests have been cloned from the test_dataframes module in the same folder.
Some of these tests pass with cudf, and others are marked with xfail
where a pandas-like method is not yet implemented in cudf.
But these tests should pass as and when cudf rolls out more pandas-like methods.
"""
from __future__ import division, print_function

import numpy as np
import pytest
import operator

import pytest
from dask.dataframe.utils import assert_eq
from distributed import Client
import numpy as np
import pandas as pd

from streamz import Stream
from streamz.dask import DaskStream
from streamz.dataframe import DataFrame, Series, DataFrames, Aggregation
from streamz.dask import DaskStream

from distributed import Client


cudf = pytest.importorskip("cudf")

Expand Down Expand Up @@ -70,8 +74,8 @@ def test_attributes():
df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})
sdf = DataFrame(example=df)

assert 'x' in dir(sdf)
assert 'z' not in dir(sdf)
assert getattr(sdf,'x',-1) != -1
assert getattr(sdf,'z',-1) == -1

sdf.x
with pytest.raises(AttributeError):
Expand Down Expand Up @@ -359,6 +363,53 @@ def test_setitem_overwrites(stream):
assert_eq(L[-1], df.iloc[7:] * 2)


@pytest.mark.parametrize('kwargs,op', [
({}, 'sum'),
({}, 'mean'),
pytest.param({}, 'min'),
pytest.param({}, 'median', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")),
pytest.param({}, 'max'),
pytest.param({}, 'var', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")),
pytest.param({}, 'count'),
pytest.param({'ddof': 0}, 'std', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")),
pytest.param({'quantile': 0.5}, 'quantile', marks=pytest.mark.xfail(reason="Not implemented for rolling objects")),
pytest.param({'arg': {'A': 'sum', 'B': 'min'}}, 'aggregate', marks=pytest.mark.xfail(reason="Not implemented"))
])
@pytest.mark.parametrize('window', [
pytest.param(2),
7,
pytest.param('3h'),
pd.Timedelta('200 minutes')
])
@pytest.mark.parametrize('m', [
2,
pytest.param(5)
])
@pytest.mark.parametrize('pre_get,post_get', [
(lambda df: df, lambda df: df),
(lambda df: df.x, lambda x: x),
(lambda df: df, lambda df: df.x)
])
def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs,
stream):
index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h')
df = cudf.DataFrame({'x': np.arange(len(index))}, index=index)

expected = getattr(post_get(pre_get(df).rolling(window)), op)(**kwargs)

sdf = DataFrame(example=df, stream=stream)
roll = getattr(post_get(pre_get(sdf).rolling(window)), op)(**kwargs)
L = roll.stream.gather().sink_to_list()
assert len(L) == 0

for i in range(0, len(df), m):
sdf.emit(df.iloc[i: i + m])

assert len(L) > 1

assert_eq(cudf.concat(L), expected)


def test_stream_to_dataframe(stream):
df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})
source = stream
Expand All @@ -382,24 +433,6 @@ def test_to_frame(stream):
assert list(a.columns) == ['x']


def test_instantiate_with_dict(stream):
df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})
sdf = DataFrame(example=df, stream=stream)

sdf2 = DataFrame({'a': sdf.x, 'b': sdf.x * 2,
'c': sdf.y % 2})
L = sdf2.stream.gather().sink_to_list()
assert len(sdf2.columns) == 3

sdf.emit(df)
sdf.emit(df)

assert len(L) == 2
for x in L:
assert_eq(x[['a', 'b', 'c']],
cudf.DataFrame({'a': df.x, 'b': df.x * 2, 'c': df.y % 2}))


@pytest.mark.parametrize('op', ['cumsum', 'cummax', 'cumprod', 'cummin'])
@pytest.mark.parametrize('getter', [lambda df: df, lambda df: df.x])
def test_cumulative_aggregations(op, getter, stream):
Expand Down
5 changes: 0 additions & 5 deletions streamz/dataframe/tests/test_dataframe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
import numpy as np


def test_utils_is_dataframe_like():
test_utils_dataframe = pytest.importorskip('dask.dataframe.tests.test_utils_dataframe')
test_utils_dataframe.test_is_dataframe_like()


def test_utils_get_base_frame_type_pandas():
with pytest.raises(TypeError):
get_base_frame_type("DataFrame", is_dataframe_like, None)
Expand Down
4 changes: 2 additions & 2 deletions streamz/dataframe/tests/test_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def test_attributes():
df = pd.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})
sdf = DataFrame(example=df)

assert 'x' in dir(sdf)
assert 'z' not in dir(sdf)
assert getattr(sdf,'x',-1) != -1
assert getattr(sdf,'z',-1) == -1

sdf.x
with pytest.raises(AttributeError):
Expand Down
2 changes: 1 addition & 1 deletion streamz/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from distributed.utils_test import gen_cluster, inc, cluster, loop, slowinc # noqa: F401


@gen_cluster(client=True, check_new_threads=False)
@gen_cluster(client=True)
def test_map(c, s, a, b):
source = Stream(asynchronous=True)
futures = scatter(source).map(inc)
Expand Down