From 19f3f9abec4116238f3e5fea43f9e76777313bba Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Mon, 26 Aug 2019 23:42:31 +0000 Subject: [PATCH 1/7] Checkpointing feature added --- examples/gpu-dataframes.ipynb | 125 ---------------------------------- streamz/sources.py | 53 ++++++++++++-- 2 files changed, 48 insertions(+), 130 deletions(-) delete mode 100644 examples/gpu-dataframes.ipynb diff --git a/examples/gpu-dataframes.ipynb b/examples/gpu-dataframes.ipynb deleted file mode 100644 index 89b61c58..00000000 --- a/examples/gpu-dataframes.ipynb +++ /dev/null @@ -1,125 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from streamz.dataframe import DataFrame\n", - "import cudf" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Basic example" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "cu_df = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n", - "\n", - "sdf = DataFrame(example=cu_df)\n", - "\n", - "L = sdf.window(n=15).x.sum().stream.sink_to_list()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "sdf.emit(cu_df.iloc[:8])\n", - "sdf.emit(cu_df)\n", - "sdf.emit(cu_df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "print(L[0])\n", - "print(L[1])\n", - "print(L[2])" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Advanced example\n", - "The following pipeline reads json encoded strings from Kafka in batches and process them on GPUs and write the result back to a different Kafka topic. This pipeline can be easily extended to run on Dask Stream as well.\n", - "Note: Uses cudf 0.8" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# read messages from kafka and create a stream\n", - "\n", - "consume_topic = \"my-topic\"\n", - "produce_topic = \"my-out-topic\"\n", - "bootstrap_servers = 'localhost:9092'\n", - "consumer_conf = {'bootstrap.servers': bootstrap_servers,\n", - " 'group.id': 'group-123', 'session.timeout.ms': 600}\n", - "producer_conf = {'bootstrap.servers': bootstrap_servers}\n", - "\n", - "stream = Stream.from_kafka_batched(consume_topic, consumer_conf, poll_interval='10s',\n", - " npartitions=10, asynchronous=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# convert batch of encoded json strings to gpu dataframes\n", - "cudf_stream = stream\\\n", - " .map(lambda msgs: \"\\n\".join([msg.decode('utf-8') for msg in msgs]))\\\n", - " .map(cudf.read_json, lines=True)\n", - "\n", - "# create a streamz dataframe from the above stream and sample dataframe\n", - "cudf_example = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n", - "stdf = DataFrame(cudf_stream, example=cudf_example)\n", - "\n", - "# perform aggregation and write to kafka\n", - "stdf.window(n=15).x.mean().stream.to_kafka(produce_topic, producer_conf)\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.3" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/streamz/sources.py b/streamz/sources.py index 68b1f5f1..690c915d 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -1,4 +1,5 @@ from glob import glob +import json import os import time @@ -453,11 +454,12 @@ def _close_consumer(self): class FromKafkaBatched(Stream): """Base class for both local and cluster-based batched kafka processing""" def __init__(self, topic, consumer_params, poll_interval='1s', - npartitions=1, **kwargs): + npartitions=1, checkpointing=None, **kwargs): self.consumer_params = consumer_params self.topic = topic self.npartitions = npartitions self.positions = [0] * npartitions + self.checkpointing = checkpointing self.poll_interval = convert_interval(poll_interval) self.stopped = True @@ -470,6 +472,14 @@ def poll_kafka(self): try: while not self.stopped: out = [] + + latest_checkpoint = {} + if self.checkpointing is not None: + if os.path.exists(self.checkpointing): + with open(self.checkpointing, 'r') as fr: + latest_checkpoint = json.loads(fr.readlines()[-1]) + fr.close() + for partition in range(self.npartitions): tp = ck.TopicPartition(self.topic, partition, 0) try: @@ -477,11 +487,19 @@ def poll_kafka(self): tp, timeout=0.1) except (RuntimeError, ck.KafkaException): continue + current_position = self.positions[partition] + group = self.consumer_params['group.id'] + + if group in latest_checkpoint.keys(): + if self.topic in latest_checkpoint[group].keys(): + if str(partition) in latest_checkpoint[group][self.topic].keys(): + current_position = latest_checkpoint[group][self.topic][str(partition)] + lowest = max(current_position, low) if high > lowest: out.append((self.consumer_params, self.topic, partition, - lowest, high - 1)) + lowest, high - 1, self.checkpointing)) self.positions[partition] = high for part in out: @@ -507,7 +525,8 @@ def start(self): @Stream.register_api(staticmethod) def from_kafka_batched(topic, consumer_params, poll_interval='1s', - npartitions=1, start=False, dask=False, **kwargs): + npartitions=1, start=False, dask=False, + checkpointing=None, **kwargs): """ Get messages from Kafka in batches Uses the confluent-kafka library, @@ -549,7 +568,8 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s', kwargs['loop'] = default_client().loop source = FromKafkaBatched(topic, consumer_params, poll_interval=poll_interval, - npartitions=npartitions, **kwargs) + npartitions=npartitions, + checkpointing=checkpointing, **kwargs) if dask: source = source.scatter() @@ -559,7 +579,27 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s', return source.starmap(get_message_batch) -def get_message_batch(kafka_params, topic, partition, low, high, timeout=None): +def add_checkpoint(group, checkpoint, path): + topic = checkpoint.topic + partition = checkpoint.partition + offset = checkpoint.offset + latest_checkpoint = {} + if os.path.exists(path): + with open(path, 'r') as fr: + latest_checkpoint = json.loads(fr.readlines()[-1]) + fr.close() + if group not in latest_checkpoint.keys(): + latest_checkpoint[group] = {} + if topic not in latest_checkpoint[group].keys(): + latest_checkpoint[group][topic] = {} + latest_checkpoint[group][topic][partition] = offset + print(latest_checkpoint) + with open(path, 'a+') as fw: + fw.write(json.dumps(latest_checkpoint) + '\n') + fw.close() + + +def get_message_batch(kafka_params, topic, partition, low, high, checkpointing, timeout=None): """Fetch a batch of kafka messages in given topic/partition This will block until messages are available, or timeout is reached. @@ -583,5 +623,8 @@ def get_message_batch(kafka_params, topic, partition, low, high, timeout=None): if timeout is not None and time.time() - t0 > timeout: break finally: + if checkpointing is not None: + checkpoint = consumer.commit(asynchronous=False) + add_checkpoint(kafka_params['group.id'], checkpoint[0], checkpointing) consumer.close() return out From 52ef68e4796341d64badc2a10f3fa618a7718c94 Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Tue, 27 Aug 2019 21:29:32 +0000 Subject: [PATCH 2/7] Add directory structure for checkpointing --- streamz/sources.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/streamz/sources.py b/streamz/sources.py index 690c915d..4b20db39 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -475,8 +475,12 @@ def poll_kafka(self): latest_checkpoint = {} if self.checkpointing is not None: - if os.path.exists(self.checkpointing): - with open(self.checkpointing, 'r') as fr: + if not os.path.exists(self.checkpointing): + os.makedirs(self.checkpointing) + checkpoints_list = os.listdir(self.checkpointing) + if len(checkpoints_list) > 0: + previous_checkpoint = max(checkpoints_list) + with open(self.checkpointing + '/' + previous_checkpoint, 'r') as fr: latest_checkpoint = json.loads(fr.readlines()[-1]) fr.close() @@ -584,9 +588,14 @@ def add_checkpoint(group, checkpoint, path): partition = checkpoint.partition offset = checkpoint.offset latest_checkpoint = {} - if os.path.exists(path): - with open(path, 'r') as fr: - latest_checkpoint = json.loads(fr.readlines()[-1]) + previous_checkpoint = None + if not os.path.exists(path): + os.makedirs(path) + checkpoints_list = os.listdir(path) + if len(checkpoints_list) > 0: + previous_checkpoint = max(checkpoints_list) + with open(path + '/' + previous_checkpoint, 'r') as fr: + latest_checkpoint = json.loads(fr.readlines()[0]) fr.close() if group not in latest_checkpoint.keys(): latest_checkpoint[group] = {} @@ -594,7 +603,12 @@ def add_checkpoint(group, checkpoint, path): latest_checkpoint[group][topic] = {} latest_checkpoint[group][topic][partition] = offset print(latest_checkpoint) - with open(path, 'a+') as fw: + if previous_checkpoint is None: + new_checkpoint = '1.txt' + else: + previous_batch = int(previous_checkpoint.split('.')[0]) + new_checkpoint = str(previous_batch + 1) + '.txt' + with open(path + '/' + new_checkpoint, 'a+') as fw: fw.write(json.dumps(latest_checkpoint) + '\n') fw.close() From fd84269e9c1750c51ce7ff169358611611015b4f Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Tue, 27 Aug 2019 21:35:12 +0000 Subject: [PATCH 3/7] Added cleanup of old checkpoints --- streamz/sources.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streamz/sources.py b/streamz/sources.py index 4b20db39..22aa5e43 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -597,6 +597,8 @@ def add_checkpoint(group, checkpoint, path): with open(path + '/' + previous_checkpoint, 'r') as fr: latest_checkpoint = json.loads(fr.readlines()[0]) fr.close() + if len(checkpoints_list) == 5: + os.system('rm -rf ' + path + '/' + min(checkpoints_list)) if group not in latest_checkpoint.keys(): latest_checkpoint[group] = {} if topic not in latest_checkpoint[group].keys(): From 9b7a7101a9040b3f19b3cd0f76078509cbfd74a4 Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Tue, 27 Aug 2019 22:19:58 +0000 Subject: [PATCH 4/7] Fix directory structure for checkpointing (2) --- streamz/sources.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/streamz/sources.py b/streamz/sources.py index 22aa5e43..7aba12cf 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -477,10 +477,13 @@ def poll_kafka(self): if self.checkpointing is not None: if not os.path.exists(self.checkpointing): os.makedirs(self.checkpointing) - checkpoints_list = os.listdir(self.checkpointing) + topic_path = self.checkpointing + '/' + self.topic + if not os.path.exists(topic_path): + os.makedirs(topic_path) + checkpoints_list = os.listdir(topic_path) if len(checkpoints_list) > 0: previous_checkpoint = max(checkpoints_list) - with open(self.checkpointing + '/' + previous_checkpoint, 'r') as fr: + with open(topic_path + '/' + previous_checkpoint, 'r') as fr: latest_checkpoint = json.loads(fr.readlines()[-1]) fr.close() @@ -496,9 +499,8 @@ def poll_kafka(self): group = self.consumer_params['group.id'] if group in latest_checkpoint.keys(): - if self.topic in latest_checkpoint[group].keys(): - if str(partition) in latest_checkpoint[group][self.topic].keys(): - current_position = latest_checkpoint[group][self.topic][str(partition)] + if str(partition) in latest_checkpoint[group].keys(): + current_position = latest_checkpoint[group][str(partition)] lowest = max(current_position, low) if high > lowest: @@ -589,6 +591,9 @@ def add_checkpoint(group, checkpoint, path): offset = checkpoint.offset latest_checkpoint = {} previous_checkpoint = None + if not os.path.exists(path): + os.makedirs(path) + path = path + '/' + topic if not os.path.exists(path): os.makedirs(path) checkpoints_list = os.listdir(path) @@ -601,9 +606,7 @@ def add_checkpoint(group, checkpoint, path): os.system('rm -rf ' + path + '/' + min(checkpoints_list)) if group not in latest_checkpoint.keys(): latest_checkpoint[group] = {} - if topic not in latest_checkpoint[group].keys(): - latest_checkpoint[group][topic] = {} - latest_checkpoint[group][topic][partition] = offset + latest_checkpoint[group][partition] = offset print(latest_checkpoint) if previous_checkpoint is None: new_checkpoint = '1.txt' From b0f35012c792f9ac7d364cf29d6148251de4d377 Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Fri, 6 Sep 2019 17:23:47 +0000 Subject: [PATCH 5/7] Remove cudf tests, since they would now live to RAPIDS/cudf/custreamz. --- .../dataframe/tests/test_cudf_dataframes.py | 769 ------------------ streamz/sources.py | 1 + 2 files changed, 1 insertion(+), 769 deletions(-) delete mode 100644 streamz/dataframe/tests/test_cudf_dataframes.py diff --git a/streamz/dataframe/tests/test_cudf_dataframes.py b/streamz/dataframe/tests/test_cudf_dataframes.py deleted file mode 100644 index a557a848..00000000 --- a/streamz/dataframe/tests/test_cudf_dataframes.py +++ /dev/null @@ -1,769 +0,0 @@ -""" -Tests for cudf DataFrames: -All tests have been cloned from the test_dataframes module in the streamz -repository. Some of these tests pass with cudf, and others are marked with -xfail, where a pandas-like method is not yet implemented in cudf. But these -tests should pass as and when cudf rolls out more pandas-like methods. -""" -from __future__ import division, print_function - -import json -import operator - -import numpy as np -import pandas as pd -import pytest -from dask.dataframe.utils import assert_eq -from distributed import Client - -from streamz import Stream -from streamz.dask import DaskStream -from streamz.dataframe import Aggregation, DataFrame, DataFrames, Series - -cudf = pytest.importorskip("cudf") - - -@pytest.fixture(scope="module") -def client(): - client = Client(processes=False, asynchronous=False) - try: - yield client - finally: - client.close() - - -@pytest.fixture(params=['core', 'dask']) -def stream(request, client): # flake8: noqa - if request.param == 'core': - return Stream() - else: - return DaskStream() - - -def test_identity(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.stream.gather().sink_to_list() - - sdf.emit(df) - - assert L[0] is df - assert list(sdf.example.columns) == ["x", "y"] - - x = sdf.x - assert isinstance(x, Series) - L2 = x.stream.gather().sink_to_list() - assert not L2 - - sdf.emit(df) - assert isinstance(L2[0], cudf.Series) - assert assert_eq(L2[0], df.x) - - -def test_dtype(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert str(sdf.dtypes) == str(df.dtypes) - assert sdf.x.dtype == df.x.dtype - assert sdf.index.dtype == df.index.dtype - - -def test_attributes(): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df) - - assert getattr(sdf, "x", -1) != -1 - assert getattr(sdf, "z", -1) == -1 - - sdf.x - with pytest.raises(AttributeError): - sdf.z - - -def test_exceptions(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - with pytest.raises(TypeError): - sdf.emit(1) - - with pytest.raises(IndexError): - sdf.emit(cudf.DataFrame()) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), - lambda x: x.count() -]) -def test_reductions(stream, func): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) - for example in [df, df.iloc[:0]]: - sdf = DataFrame(example=example, stream=stream) - - df_out = func(sdf).stream.gather().sink_to_list() - - x = sdf.x - x_out = func(x).stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert_eq(df_out[-1], func(cudf.concat([df, df]))) - assert_eq(x_out[-1], func(cudf.concat([df, df]).x)) - - -@pytest.mark.parametrize( - "op", - [ - operator.add, - operator.and_, - operator.eq, - operator.floordiv, - operator.ge, - operator.gt, - operator.le, - operator.lshift, - operator.lt, - operator.mod, - operator.mul, - operator.ne, - operator.or_, - operator.pow, - operator.rshift, - operator.sub, - operator.truediv, - operator.xor, - ], -) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_binary_operators(op, getter, stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - try: - left = op(getter(df), 2) - right = op(2, getter(df)) - except Exception: - return - - a = DataFrame(example=df, stream=stream) - li = op(getter(a), 2).stream.gather().sink_to_list() - r = op(2, getter(a)).stream.gather().sink_to_list() - - a.emit(df) - - assert_eq(li[0], left) - assert_eq(r[0], right) - - -@pytest.mark.parametrize( - "op", - [ - operator.abs, - operator.inv, - operator.invert, - operator.neg, - lambda x: x.map(lambda x: x + 1), - lambda x: x.reset_index(), - lambda x: x.astype(float), - ], -) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_unary_operators(op, getter): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - try: - expected = op(getter(df)) - except Exception: - return - - a = DataFrame(example=df) - b = op(getter(a)).stream.sink_to_list() - - a.emit(df) - - assert_eq(b[0], expected) - - -@pytest.mark.parametrize( - "func", - [ - lambda df: df.query("x > 1 and x < 4"), - pytest.param(lambda df: df.x.value_counts().nlargest(2) - .astype(int), marks=pytest.mark.xfail( - reason="Index name lost in _getattr_")) - ], -) -def test_dataframe_simple(func): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - expected = func(df) - - a = DataFrame(example=df) - L = func(a).stream.sink_to_list() - - a.emit(df) - - assert_eq(L[0], expected) - - -def test_set_index(): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - - a = DataFrame(example=df) - - b = a.set_index("x").stream.sink_to_list() - a.emit(df) - assert_eq(b[0], df.set_index("x")) - - b = a.set_index(a.y + 1).stream.sink_to_list() - a.emit(df) - assert_eq(b[0], df.set_index(df.y + 1)) - - -def test_binary_stream_operators(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - - expected = df.x + df.y - - a = DataFrame(example=df, stream=stream) - b = (a.x + a.y).stream.gather().sink_to_list() - - a.emit(df) - - assert_eq(b[0], expected) - - -def test_index(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - a = DataFrame(example=df, stream=stream) - b = a.index + 5 - L = b.stream.gather().sink_to_list() - - a.emit(df) - a.emit(df) - - assert_eq(L[0], df.index + 5) - assert_eq(L[1], df.index + 5) - - -def test_pair_arithmetic(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - L = ((a.x + a.y) * 2).stream.gather().sink_to_list() - - a.emit(df.iloc[:5]) - a.emit(df.iloc[5:]) - - assert len(L) == 2 - assert_eq(cudf.concat(L), (df.x + df.y) * 2) - - -def test_getitem(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - L = a[a.x > 4].stream.gather().sink_to_list() - - a.emit(df.iloc[:5]) - a.emit(df.iloc[5:]) - - assert len(L) == 2 - assert_eq(cudf.concat(L), df[df.x > 4]) - - -@pytest.mark.parametrize('agg', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('grouper', [lambda a: a.x % 3, - lambda a: 'x', - lambda a: a.index % 2, - lambda a: ['x']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['y']], - lambda g: g[['x', 'y']]]) -def test_groupby_aggregate(agg, grouper, indexer, stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0, 2.0] * 5}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - - def f(x): - return agg(indexer(x.groupby(grouper(x)))) - - L = f(a).stream.gather().sink_to_list() - - a.emit(df.iloc[:3]) - a.emit(df.iloc[3:7]) - a.emit(df.iloc[7:]) - - first = df.iloc[:3] - g = f(first) - - h = f(df) - - assert assert_eq(L[0], g) - assert assert_eq(L[-1], h) - - -@pytest.mark.xfail( - reason="AttributeError: 'StringColumn' object" - "has no attribute 'value_counts'" -) -def test_value_counts(stream): - s = cudf.Series(["a", "b", "a"]) - - a = Series(example=s, stream=stream) - - b = a.value_counts() - assert b._stream_type == "updating" - result = b.stream.gather().sink_to_list() - - a.emit(s) - a.emit(s) - - assert_eq(result[-1], cudf.concat([s, s]).value_counts()) - - -def test_repr(stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0] * 10}) - a = DataFrame(example=df, stream=stream) - - text = repr(a) - assert type(a).__name__ in text - assert 'x' in text - assert 'y' in text - - text = repr(a.x) - assert type(a.x).__name__ in text - assert 'x' in text - - text = repr(a.x.sum()) - assert type(a.x.sum()).__name__ in text - - -def test_repr_html(stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0] * 10}) - a = DataFrame(example=df, stream=stream) - - for x in [a, a.y, a.y.mean()]: - html = x._repr_html_() - assert type(x).__name__ in html - assert '1' in html - - -def test_setitem(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - sdf = DataFrame(example=df.iloc[:0], stream=stream) - stream = sdf.stream - - sdf["z"] = sdf["x"] * 2 - sdf["a"] = 10 - sdf[["c", "d"]] = sdf[["x", "y"]] - - L = sdf.mean().stream.gather().sink_to_list() - - stream.emit(df.iloc[:3]) - stream.emit(df.iloc[3:7]) - stream.emit(df.iloc[7:]) - - df["z"] = df["x"] * 2 - df["a"] = 10 - df["c"] = df["x"] - df["d"] = df["y"] - - assert_eq(L[-1], df.mean()) - - -def test_setitem_overwrites(stream): - df = cudf.DataFrame({"x": list(range(10))}) - sdf = DataFrame(example=df.iloc[:0], stream=stream) - stream = sdf.stream - - sdf["x"] = sdf["x"] * 2 - - L = sdf.stream.gather().sink_to_list() - - stream.emit(df.iloc[:3]) - stream.emit(df.iloc[3:7]) - stream.emit(df.iloc[7:]) - - assert_eq(L[-1], df.iloc[7:] * 2) - - -@pytest.mark.parametrize( - "kwargs,op", - [ - ({}, "sum"), - ({}, "mean"), - pytest.param({}, "min"), - pytest.param( - {}, - "median", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param({}, "max"), - pytest.param( - {}, - "var", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param({}, "count"), - pytest.param( - {"ddof": 0}, - "std", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param( - {"quantile": 0.5}, - "quantile", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param( - {"arg": {"A": "sum", "B": "min"}}, - "aggregate", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - ], -) -@pytest.mark.parametrize( - "window", [pytest.param(2), 7, pytest.param("3h"), - pd.Timedelta("200 minutes")] -) -@pytest.mark.parametrize("m", [2, pytest.param(5)]) -@pytest.mark.parametrize( - "pre_get,post_get", - [ - (lambda df: df, lambda df: df), - (lambda df: df.x, lambda x: x), - (lambda df: df, lambda df: df.x), - ], -) -def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, - stream): - index = pd.DatetimeIndex(start="2000-01-01", end="2000-01-03", freq="1h") - df = cudf.DataFrame({"x": np.arange(len(index))}, index=index) - - expected = getattr(post_get(pre_get(df).rolling(window)), op)(**kwargs) - - sdf = DataFrame(example=df, stream=stream) - roll = getattr(post_get(pre_get(sdf).rolling(window)), op)(**kwargs) - L = roll.stream.gather().sink_to_list() - assert len(L) == 0 - - for i in range(0, len(df), m): - sdf.emit(df.iloc[i:i + m]) - - assert len(L) > 1 - - assert_eq(cudf.concat(L), expected) - - -def test_stream_to_dataframe(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - source = stream - L = source.to_dataframe(example=df).x.sum().stream.gather().sink_to_list() - - source.emit(df) - source.emit(df) - source.emit(df) - - assert L == [6, 12, 18] - - -def test_integration_from_stream(stream): - source = stream - sdf = ( - source.partition(4) - .to_batch(example=['{"x": 0, "y": 0}']) - .map(json.loads) - .to_dataframe() - ) - result = sdf.groupby(sdf.x).y.sum().mean() - L = result.stream.gather().sink_to_list() - - for i in range(12): - source.emit(json.dumps({"x": i % 3, "y": i})) - - assert L == [2, 28 / 3, 22.0] - - -def test_to_frame(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert sdf.to_frame() is sdf - - a = sdf.x.to_frame() - assert isinstance(a, DataFrame) - assert list(a.columns) == ["x"] - - -@pytest.mark.parametrize("op", ["cumsum", "cummax", "cumprod", "cummin"]) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_cumulative_aggregations(op, getter, stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - expected = getattr(getter(df), op)() - - sdf = DataFrame(example=df, stream=stream) - - L = getattr(getter(sdf), op)().stream.gather().sink_to_list() - - for i in range(0, 10, 3): - sdf.emit(df.iloc[i:i + 3]) - sdf.emit(df.iloc[:0]) - - assert len(L) > 1 - - assert_eq(cudf.concat(L), expected) - - -def test_display(stream): - pytest.importorskip("ipywidgets") - pytest.importorskip("IPython") - - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - s = sdf.x.sum() - - s._ipython_display_() - - -def test_tail(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - L = sdf.tail(2).stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert_eq(L[0], df.tail(2)) - assert_eq(L[1], df.tail(2)) - - -def test_example_type_error_message(): - try: - DataFrame(example=[123]) - except Exception as e: - assert "DataFrame" in str(e) - assert "[123]" in str(e) - - -def test_dataframes(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrames(example=df, stream=stream) - L = sdf.x.sum().stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert L == [6, 6] - - -def test_groupby_aggregate_updating(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert sdf.groupby("x").y.mean()._stream_type == "updating" - assert sdf.x.sum()._stream_type == "updating" - assert (sdf.x.sum() + 1)._stream_type == "updating" - - -def test_window_sum(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.window(n=4).x.sum().stream.gather().sink_to_list() - - sdf.emit(df) - assert L == [6] - sdf.emit(df) - assert L == [6, 9] - sdf.emit(df) - assert L == [6, 9, 9] - - -def test_window_sum_dataframe(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.window(n=4).sum().stream.gather().sink_to_list() - - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - assert_eq(L[1], cudf.Series([9, 21], index=['x', 'y'])) - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - assert_eq(L[1], cudf.Series([9, 21], index=['x', 'y'])) - assert_eq(L[2], cudf.Series([9, 21], index=['x', 'y'])) - - -@pytest.mark.parametrize( - "func", - [ - lambda x: x.sum(), - lambda x: x.mean(), - lambda x: x.count(), - lambda x: x.var(ddof=1), - lambda x: x.std(ddof=1), - lambda x: x.var(ddof=0), - ], -) -@pytest.mark.parametrize("n", [2, 4]) -@pytest.mark.parametrize("getter", [lambda df: df.x]) -def test_windowing_n(func, n, getter): - df = cudf.DataFrame({"x": list(range(10)), "y": [1, 2] * 5}) - - sdf = DataFrame(example=df) - L = func(getter(sdf).window(n=n)).stream.gather().sink_to_list() - - for i in range(0, 10, 3): - sdf.emit(df.iloc[i:i + 3]) - sdf.emit(df.iloc[:0]) - - assert len(L) == 5 - - assert_eq(L[0], func(getter(df).iloc[max(0, 3 - n):3])) - assert_eq(L[-1], func(getter(df).iloc[len(df) - n:])) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('value', ['10h', '1d']) -@pytest.mark.parametrize('getter', [ - lambda df: df, - lambda df: df.x, -]) -@pytest.mark.parametrize('grouper', [lambda a: 'y', - lambda a: a.index, - lambda a: ['y']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['x']], - lambda g: g[['x', 'y']]]) -def test_groupby_windowing_value(func, value, getter, grouper, indexer): - index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h') - df = cudf.DataFrame({'x': np.arange(len(index), dtype=float), - 'y': np.arange(len(index), dtype=float) % 2}, - index=index) - - value = pd.Timedelta(value) - - sdf = DataFrame(example=df) - - def f(x): - return func(indexer(x.groupby(grouper(x)))) - - L = f(sdf.window(value)).stream.gather().sink_to_list() - - diff = 13 - for i in range(0, len(index), diff): - sdf.emit(df.iloc[i: i + diff]) - - assert len(L) == 4 - - first = df.iloc[:diff] - lost = first.loc[first.index.min() + value:] - first = first.iloc[len(lost):] - - g = f(first) - assert_eq(L[0], g) - - last = df.loc[index.max() - value + pd.Timedelta('1s'):] - h = f(last) - assert_eq(L[-1], h) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('n', [1, 4]) -@pytest.mark.parametrize('getter', [ - lambda df: df, - lambda df: df.x, -]) -@pytest.mark.parametrize('grouper', [lambda a: a.x % 3, - lambda a: 'y', - lambda a: a.index % 2, - lambda a: ['y']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['x', 'y']]]) -def test_groupby_windowing_n(func, n, getter, grouper, indexer): - df = cudf.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) - - sdf = DataFrame(example=df) - - def f(x): - return func(indexer(x.groupby(grouper(x)))) - - L = f(sdf.window(n=n)).stream.gather().sink_to_list() - - diff = 3 - for i in range(0, 10, diff): - sdf.emit(df.iloc[i: i + diff]) - sdf.emit(df.iloc[:0]) - - assert len(L) == 5 - - first = df.iloc[max(0, diff - n): diff] - - g = f(first) - assert_eq(L[0], g) - - last = df.iloc[len(df) - n:] - h = f(last) - assert_eq(L[-1], h) - - -def test_window_full(): - df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) - - sdf = DataFrame(example=df) - - L = sdf.window(n=4).apply(lambda x: x).stream.sink_to_list() - - sdf.emit(df.iloc[:3]) - sdf.emit(df.iloc[3:8]) - sdf.emit(df.iloc[8:]) - - assert_eq(L[0], df.iloc[:3]) - assert_eq(L[1], df.iloc[4:8]) - assert_eq(L[2], df.iloc[-4:]) - - -def test_custom_aggregation(): - df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) - - class Custom(Aggregation): - def initial(self, new): - return 0 - - def on_new(self, state, new): - return state + 1, state - - def on_old(self, state, new): - return state - 100, state - - sdf = DataFrame(example=df) - L = sdf.aggregate(Custom()).stream.sink_to_list() - - sdf.emit(df) - sdf.emit(df) - sdf.emit(df) - - assert L == [0, 1, 2] - - sdf = DataFrame(example=df) - L = sdf.window(n=5).aggregate(Custom()).stream.sink_to_list() - - sdf.emit(df) - sdf.emit(df) - sdf.emit(df) - - assert L == [1, -198, -397] diff --git a/streamz/sources.py b/streamz/sources.py index 7aba12cf..988b4f6d 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -602,6 +602,7 @@ def add_checkpoint(group, checkpoint, path): with open(path + '/' + previous_checkpoint, 'r') as fr: latest_checkpoint = json.loads(fr.readlines()[0]) fr.close() + #Only maintain the last 5 checkpoints if len(checkpoints_list) == 5: os.system('rm -rf ' + path + '/' + min(checkpoints_list)) if group not in latest_checkpoint.keys(): From 48072af55f2c26346388781ac7063a0a1afb11fe Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Mon, 30 Sep 2019 21:01:46 +0000 Subject: [PATCH 6/7] Revert checkpointing changes. --- examples/gpu-dataframes.ipynb | 125 --- .../dataframe/tests/test_cudf_dataframes.py | 769 ------------------ 2 files changed, 894 deletions(-) delete mode 100644 examples/gpu-dataframes.ipynb delete mode 100644 streamz/dataframe/tests/test_cudf_dataframes.py diff --git a/examples/gpu-dataframes.ipynb b/examples/gpu-dataframes.ipynb deleted file mode 100644 index 89b61c58..00000000 --- a/examples/gpu-dataframes.ipynb +++ /dev/null @@ -1,125 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from streamz.dataframe import DataFrame\n", - "import cudf" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Basic example" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "cu_df = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n", - "\n", - "sdf = DataFrame(example=cu_df)\n", - "\n", - "L = sdf.window(n=15).x.sum().stream.sink_to_list()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "sdf.emit(cu_df.iloc[:8])\n", - "sdf.emit(cu_df)\n", - "sdf.emit(cu_df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "print(L[0])\n", - "print(L[1])\n", - "print(L[2])" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Advanced example\n", - "The following pipeline reads json encoded strings from Kafka in batches and process them on GPUs and write the result back to a different Kafka topic. This pipeline can be easily extended to run on Dask Stream as well.\n", - "Note: Uses cudf 0.8" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# read messages from kafka and create a stream\n", - "\n", - "consume_topic = \"my-topic\"\n", - "produce_topic = \"my-out-topic\"\n", - "bootstrap_servers = 'localhost:9092'\n", - "consumer_conf = {'bootstrap.servers': bootstrap_servers,\n", - " 'group.id': 'group-123', 'session.timeout.ms': 600}\n", - "producer_conf = {'bootstrap.servers': bootstrap_servers}\n", - "\n", - "stream = Stream.from_kafka_batched(consume_topic, consumer_conf, poll_interval='10s',\n", - " npartitions=10, asynchronous=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# convert batch of encoded json strings to gpu dataframes\n", - "cudf_stream = stream\\\n", - " .map(lambda msgs: \"\\n\".join([msg.decode('utf-8') for msg in msgs]))\\\n", - " .map(cudf.read_json, lines=True)\n", - "\n", - "# create a streamz dataframe from the above stream and sample dataframe\n", - "cudf_example = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n", - "stdf = DataFrame(cudf_stream, example=cudf_example)\n", - "\n", - "# perform aggregation and write to kafka\n", - "stdf.window(n=15).x.mean().stream.to_kafka(produce_topic, producer_conf)\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.3" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/streamz/dataframe/tests/test_cudf_dataframes.py b/streamz/dataframe/tests/test_cudf_dataframes.py deleted file mode 100644 index a557a848..00000000 --- a/streamz/dataframe/tests/test_cudf_dataframes.py +++ /dev/null @@ -1,769 +0,0 @@ -""" -Tests for cudf DataFrames: -All tests have been cloned from the test_dataframes module in the streamz -repository. Some of these tests pass with cudf, and others are marked with -xfail, where a pandas-like method is not yet implemented in cudf. But these -tests should pass as and when cudf rolls out more pandas-like methods. -""" -from __future__ import division, print_function - -import json -import operator - -import numpy as np -import pandas as pd -import pytest -from dask.dataframe.utils import assert_eq -from distributed import Client - -from streamz import Stream -from streamz.dask import DaskStream -from streamz.dataframe import Aggregation, DataFrame, DataFrames, Series - -cudf = pytest.importorskip("cudf") - - -@pytest.fixture(scope="module") -def client(): - client = Client(processes=False, asynchronous=False) - try: - yield client - finally: - client.close() - - -@pytest.fixture(params=['core', 'dask']) -def stream(request, client): # flake8: noqa - if request.param == 'core': - return Stream() - else: - return DaskStream() - - -def test_identity(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.stream.gather().sink_to_list() - - sdf.emit(df) - - assert L[0] is df - assert list(sdf.example.columns) == ["x", "y"] - - x = sdf.x - assert isinstance(x, Series) - L2 = x.stream.gather().sink_to_list() - assert not L2 - - sdf.emit(df) - assert isinstance(L2[0], cudf.Series) - assert assert_eq(L2[0], df.x) - - -def test_dtype(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert str(sdf.dtypes) == str(df.dtypes) - assert sdf.x.dtype == df.x.dtype - assert sdf.index.dtype == df.index.dtype - - -def test_attributes(): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df) - - assert getattr(sdf, "x", -1) != -1 - assert getattr(sdf, "z", -1) == -1 - - sdf.x - with pytest.raises(AttributeError): - sdf.z - - -def test_exceptions(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - with pytest.raises(TypeError): - sdf.emit(1) - - with pytest.raises(IndexError): - sdf.emit(cudf.DataFrame()) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), - lambda x: x.count() -]) -def test_reductions(stream, func): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) - for example in [df, df.iloc[:0]]: - sdf = DataFrame(example=example, stream=stream) - - df_out = func(sdf).stream.gather().sink_to_list() - - x = sdf.x - x_out = func(x).stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert_eq(df_out[-1], func(cudf.concat([df, df]))) - assert_eq(x_out[-1], func(cudf.concat([df, df]).x)) - - -@pytest.mark.parametrize( - "op", - [ - operator.add, - operator.and_, - operator.eq, - operator.floordiv, - operator.ge, - operator.gt, - operator.le, - operator.lshift, - operator.lt, - operator.mod, - operator.mul, - operator.ne, - operator.or_, - operator.pow, - operator.rshift, - operator.sub, - operator.truediv, - operator.xor, - ], -) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_binary_operators(op, getter, stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - try: - left = op(getter(df), 2) - right = op(2, getter(df)) - except Exception: - return - - a = DataFrame(example=df, stream=stream) - li = op(getter(a), 2).stream.gather().sink_to_list() - r = op(2, getter(a)).stream.gather().sink_to_list() - - a.emit(df) - - assert_eq(li[0], left) - assert_eq(r[0], right) - - -@pytest.mark.parametrize( - "op", - [ - operator.abs, - operator.inv, - operator.invert, - operator.neg, - lambda x: x.map(lambda x: x + 1), - lambda x: x.reset_index(), - lambda x: x.astype(float), - ], -) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_unary_operators(op, getter): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - try: - expected = op(getter(df)) - except Exception: - return - - a = DataFrame(example=df) - b = op(getter(a)).stream.sink_to_list() - - a.emit(df) - - assert_eq(b[0], expected) - - -@pytest.mark.parametrize( - "func", - [ - lambda df: df.query("x > 1 and x < 4"), - pytest.param(lambda df: df.x.value_counts().nlargest(2) - .astype(int), marks=pytest.mark.xfail( - reason="Index name lost in _getattr_")) - ], -) -def test_dataframe_simple(func): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - expected = func(df) - - a = DataFrame(example=df) - L = func(a).stream.sink_to_list() - - a.emit(df) - - assert_eq(L[0], expected) - - -def test_set_index(): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - - a = DataFrame(example=df) - - b = a.set_index("x").stream.sink_to_list() - a.emit(df) - assert_eq(b[0], df.set_index("x")) - - b = a.set_index(a.y + 1).stream.sink_to_list() - a.emit(df) - assert_eq(b[0], df.set_index(df.y + 1)) - - -def test_binary_stream_operators(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - - expected = df.x + df.y - - a = DataFrame(example=df, stream=stream) - b = (a.x + a.y).stream.gather().sink_to_list() - - a.emit(df) - - assert_eq(b[0], expected) - - -def test_index(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - a = DataFrame(example=df, stream=stream) - b = a.index + 5 - L = b.stream.gather().sink_to_list() - - a.emit(df) - a.emit(df) - - assert_eq(L[0], df.index + 5) - assert_eq(L[1], df.index + 5) - - -def test_pair_arithmetic(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - L = ((a.x + a.y) * 2).stream.gather().sink_to_list() - - a.emit(df.iloc[:5]) - a.emit(df.iloc[5:]) - - assert len(L) == 2 - assert_eq(cudf.concat(L), (df.x + df.y) * 2) - - -def test_getitem(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - L = a[a.x > 4].stream.gather().sink_to_list() - - a.emit(df.iloc[:5]) - a.emit(df.iloc[5:]) - - assert len(L) == 2 - assert_eq(cudf.concat(L), df[df.x > 4]) - - -@pytest.mark.parametrize('agg', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('grouper', [lambda a: a.x % 3, - lambda a: 'x', - lambda a: a.index % 2, - lambda a: ['x']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['y']], - lambda g: g[['x', 'y']]]) -def test_groupby_aggregate(agg, grouper, indexer, stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0, 2.0] * 5}) - - a = DataFrame(example=df.iloc[:0], stream=stream) - - def f(x): - return agg(indexer(x.groupby(grouper(x)))) - - L = f(a).stream.gather().sink_to_list() - - a.emit(df.iloc[:3]) - a.emit(df.iloc[3:7]) - a.emit(df.iloc[7:]) - - first = df.iloc[:3] - g = f(first) - - h = f(df) - - assert assert_eq(L[0], g) - assert assert_eq(L[-1], h) - - -@pytest.mark.xfail( - reason="AttributeError: 'StringColumn' object" - "has no attribute 'value_counts'" -) -def test_value_counts(stream): - s = cudf.Series(["a", "b", "a"]) - - a = Series(example=s, stream=stream) - - b = a.value_counts() - assert b._stream_type == "updating" - result = b.stream.gather().sink_to_list() - - a.emit(s) - a.emit(s) - - assert_eq(result[-1], cudf.concat([s, s]).value_counts()) - - -def test_repr(stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0] * 10}) - a = DataFrame(example=df, stream=stream) - - text = repr(a) - assert type(a).__name__ in text - assert 'x' in text - assert 'y' in text - - text = repr(a.x) - assert type(a.x).__name__ in text - assert 'x' in text - - text = repr(a.x.sum()) - assert type(a.x.sum()).__name__ in text - - -def test_repr_html(stream): - df = cudf.DataFrame({'x': (np.arange(10) // 2).astype(float), - 'y': [1.0] * 10}) - a = DataFrame(example=df, stream=stream) - - for x in [a, a.y, a.y.mean()]: - html = x._repr_html_() - assert type(x).__name__ in html - assert '1' in html - - -def test_setitem(stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - - sdf = DataFrame(example=df.iloc[:0], stream=stream) - stream = sdf.stream - - sdf["z"] = sdf["x"] * 2 - sdf["a"] = 10 - sdf[["c", "d"]] = sdf[["x", "y"]] - - L = sdf.mean().stream.gather().sink_to_list() - - stream.emit(df.iloc[:3]) - stream.emit(df.iloc[3:7]) - stream.emit(df.iloc[7:]) - - df["z"] = df["x"] * 2 - df["a"] = 10 - df["c"] = df["x"] - df["d"] = df["y"] - - assert_eq(L[-1], df.mean()) - - -def test_setitem_overwrites(stream): - df = cudf.DataFrame({"x": list(range(10))}) - sdf = DataFrame(example=df.iloc[:0], stream=stream) - stream = sdf.stream - - sdf["x"] = sdf["x"] * 2 - - L = sdf.stream.gather().sink_to_list() - - stream.emit(df.iloc[:3]) - stream.emit(df.iloc[3:7]) - stream.emit(df.iloc[7:]) - - assert_eq(L[-1], df.iloc[7:] * 2) - - -@pytest.mark.parametrize( - "kwargs,op", - [ - ({}, "sum"), - ({}, "mean"), - pytest.param({}, "min"), - pytest.param( - {}, - "median", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param({}, "max"), - pytest.param( - {}, - "var", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param({}, "count"), - pytest.param( - {"ddof": 0}, - "std", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param( - {"quantile": 0.5}, - "quantile", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - pytest.param( - {"arg": {"A": "sum", "B": "min"}}, - "aggregate", - marks=pytest.mark.xfail(reason="Unavailable for rolling objects"), - ), - ], -) -@pytest.mark.parametrize( - "window", [pytest.param(2), 7, pytest.param("3h"), - pd.Timedelta("200 minutes")] -) -@pytest.mark.parametrize("m", [2, pytest.param(5)]) -@pytest.mark.parametrize( - "pre_get,post_get", - [ - (lambda df: df, lambda df: df), - (lambda df: df.x, lambda x: x), - (lambda df: df, lambda df: df.x), - ], -) -def test_rolling_count_aggregations(op, window, m, pre_get, post_get, kwargs, - stream): - index = pd.DatetimeIndex(start="2000-01-01", end="2000-01-03", freq="1h") - df = cudf.DataFrame({"x": np.arange(len(index))}, index=index) - - expected = getattr(post_get(pre_get(df).rolling(window)), op)(**kwargs) - - sdf = DataFrame(example=df, stream=stream) - roll = getattr(post_get(pre_get(sdf).rolling(window)), op)(**kwargs) - L = roll.stream.gather().sink_to_list() - assert len(L) == 0 - - for i in range(0, len(df), m): - sdf.emit(df.iloc[i:i + m]) - - assert len(L) > 1 - - assert_eq(cudf.concat(L), expected) - - -def test_stream_to_dataframe(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - source = stream - L = source.to_dataframe(example=df).x.sum().stream.gather().sink_to_list() - - source.emit(df) - source.emit(df) - source.emit(df) - - assert L == [6, 12, 18] - - -def test_integration_from_stream(stream): - source = stream - sdf = ( - source.partition(4) - .to_batch(example=['{"x": 0, "y": 0}']) - .map(json.loads) - .to_dataframe() - ) - result = sdf.groupby(sdf.x).y.sum().mean() - L = result.stream.gather().sink_to_list() - - for i in range(12): - source.emit(json.dumps({"x": i % 3, "y": i})) - - assert L == [2, 28 / 3, 22.0] - - -def test_to_frame(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert sdf.to_frame() is sdf - - a = sdf.x.to_frame() - assert isinstance(a, DataFrame) - assert list(a.columns) == ["x"] - - -@pytest.mark.parametrize("op", ["cumsum", "cummax", "cumprod", "cummin"]) -@pytest.mark.parametrize("getter", [lambda df: df, lambda df: df.x]) -def test_cumulative_aggregations(op, getter, stream): - df = cudf.DataFrame({"x": list(range(10)), "y": [1] * 10}) - expected = getattr(getter(df), op)() - - sdf = DataFrame(example=df, stream=stream) - - L = getattr(getter(sdf), op)().stream.gather().sink_to_list() - - for i in range(0, 10, 3): - sdf.emit(df.iloc[i:i + 3]) - sdf.emit(df.iloc[:0]) - - assert len(L) > 1 - - assert_eq(cudf.concat(L), expected) - - -def test_display(stream): - pytest.importorskip("ipywidgets") - pytest.importorskip("IPython") - - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - s = sdf.x.sum() - - s._ipython_display_() - - -def test_tail(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - L = sdf.tail(2).stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert_eq(L[0], df.tail(2)) - assert_eq(L[1], df.tail(2)) - - -def test_example_type_error_message(): - try: - DataFrame(example=[123]) - except Exception as e: - assert "DataFrame" in str(e) - assert "[123]" in str(e) - - -def test_dataframes(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrames(example=df, stream=stream) - L = sdf.x.sum().stream.gather().sink_to_list() - - sdf.emit(df) - sdf.emit(df) - - assert L == [6, 6] - - -def test_groupby_aggregate_updating(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - - assert sdf.groupby("x").y.mean()._stream_type == "updating" - assert sdf.x.sum()._stream_type == "updating" - assert (sdf.x.sum() + 1)._stream_type == "updating" - - -def test_window_sum(stream): - df = cudf.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.window(n=4).x.sum().stream.gather().sink_to_list() - - sdf.emit(df) - assert L == [6] - sdf.emit(df) - assert L == [6, 9] - sdf.emit(df) - assert L == [6, 9, 9] - - -def test_window_sum_dataframe(stream): - df = cudf.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) - sdf = DataFrame(example=df, stream=stream) - L = sdf.window(n=4).sum().stream.gather().sink_to_list() - - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - assert_eq(L[1], cudf.Series([9, 21], index=['x', 'y'])) - sdf.emit(df) - assert_eq(L[0], cudf.Series([6, 15], index=['x', 'y'])) - assert_eq(L[1], cudf.Series([9, 21], index=['x', 'y'])) - assert_eq(L[2], cudf.Series([9, 21], index=['x', 'y'])) - - -@pytest.mark.parametrize( - "func", - [ - lambda x: x.sum(), - lambda x: x.mean(), - lambda x: x.count(), - lambda x: x.var(ddof=1), - lambda x: x.std(ddof=1), - lambda x: x.var(ddof=0), - ], -) -@pytest.mark.parametrize("n", [2, 4]) -@pytest.mark.parametrize("getter", [lambda df: df.x]) -def test_windowing_n(func, n, getter): - df = cudf.DataFrame({"x": list(range(10)), "y": [1, 2] * 5}) - - sdf = DataFrame(example=df) - L = func(getter(sdf).window(n=n)).stream.gather().sink_to_list() - - for i in range(0, 10, 3): - sdf.emit(df.iloc[i:i + 3]) - sdf.emit(df.iloc[:0]) - - assert len(L) == 5 - - assert_eq(L[0], func(getter(df).iloc[max(0, 3 - n):3])) - assert_eq(L[-1], func(getter(df).iloc[len(df) - n:])) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('value', ['10h', '1d']) -@pytest.mark.parametrize('getter', [ - lambda df: df, - lambda df: df.x, -]) -@pytest.mark.parametrize('grouper', [lambda a: 'y', - lambda a: a.index, - lambda a: ['y']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['x']], - lambda g: g[['x', 'y']]]) -def test_groupby_windowing_value(func, value, getter, grouper, indexer): - index = pd.DatetimeIndex(start='2000-01-01', end='2000-01-03', freq='1h') - df = cudf.DataFrame({'x': np.arange(len(index), dtype=float), - 'y': np.arange(len(index), dtype=float) % 2}, - index=index) - - value = pd.Timedelta(value) - - sdf = DataFrame(example=df) - - def f(x): - return func(indexer(x.groupby(grouper(x)))) - - L = f(sdf.window(value)).stream.gather().sink_to_list() - - diff = 13 - for i in range(0, len(index), diff): - sdf.emit(df.iloc[i: i + diff]) - - assert len(L) == 4 - - first = df.iloc[:diff] - lost = first.loc[first.index.min() + value:] - first = first.iloc[len(lost):] - - g = f(first) - assert_eq(L[0], g) - - last = df.loc[index.max() - value + pd.Timedelta('1s'):] - h = f(last) - assert_eq(L[-1], h) - - -@pytest.mark.parametrize('func', [ - lambda x: x.sum(), - lambda x: x.mean(), -]) -@pytest.mark.parametrize('n', [1, 4]) -@pytest.mark.parametrize('getter', [ - lambda df: df, - lambda df: df.x, -]) -@pytest.mark.parametrize('grouper', [lambda a: a.x % 3, - lambda a: 'y', - lambda a: a.index % 2, - lambda a: ['y']]) -@pytest.mark.parametrize('indexer', [lambda g: g, - lambda g: g[['x', 'y']]]) -def test_groupby_windowing_n(func, n, getter, grouper, indexer): - df = cudf.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) - - sdf = DataFrame(example=df) - - def f(x): - return func(indexer(x.groupby(grouper(x)))) - - L = f(sdf.window(n=n)).stream.gather().sink_to_list() - - diff = 3 - for i in range(0, 10, diff): - sdf.emit(df.iloc[i: i + diff]) - sdf.emit(df.iloc[:0]) - - assert len(L) == 5 - - first = df.iloc[max(0, diff - n): diff] - - g = f(first) - assert_eq(L[0], g) - - last = df.iloc[len(df) - n:] - h = f(last) - assert_eq(L[-1], h) - - -def test_window_full(): - df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) - - sdf = DataFrame(example=df) - - L = sdf.window(n=4).apply(lambda x: x).stream.sink_to_list() - - sdf.emit(df.iloc[:3]) - sdf.emit(df.iloc[3:8]) - sdf.emit(df.iloc[8:]) - - assert_eq(L[0], df.iloc[:3]) - assert_eq(L[1], df.iloc[4:8]) - assert_eq(L[2], df.iloc[-4:]) - - -def test_custom_aggregation(): - df = cudf.DataFrame({"x": np.arange(10, dtype=float), "y": [1.0, 2.0] * 5}) - - class Custom(Aggregation): - def initial(self, new): - return 0 - - def on_new(self, state, new): - return state + 1, state - - def on_old(self, state, new): - return state - 100, state - - sdf = DataFrame(example=df) - L = sdf.aggregate(Custom()).stream.sink_to_list() - - sdf.emit(df) - sdf.emit(df) - sdf.emit(df) - - assert L == [0, 1, 2] - - sdf = DataFrame(example=df) - L = sdf.window(n=5).aggregate(Custom()).stream.sink_to_list() - - sdf.emit(df) - sdf.emit(df) - sdf.emit(df) - - assert L == [1, -198, -397] From cd2f2f398f2906de48c32038ae0f2d0d98efc3eb Mon Sep 17 00:00:00 2001 From: chinmaychandak Date: Mon, 30 Sep 2019 21:15:14 +0000 Subject: [PATCH 7/7] Revert checkpointing changes --- streamz/sources.py | 73 +++----------------------------- streamz/tests/test_kafka.py | 83 ------------------------------------- 2 files changed, 5 insertions(+), 151 deletions(-) diff --git a/streamz/sources.py b/streamz/sources.py index 988b4f6d..68b1f5f1 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -1,5 +1,4 @@ from glob import glob -import json import os import time @@ -454,12 +453,11 @@ def _close_consumer(self): class FromKafkaBatched(Stream): """Base class for both local and cluster-based batched kafka processing""" def __init__(self, topic, consumer_params, poll_interval='1s', - npartitions=1, checkpointing=None, **kwargs): + npartitions=1, **kwargs): self.consumer_params = consumer_params self.topic = topic self.npartitions = npartitions self.positions = [0] * npartitions - self.checkpointing = checkpointing self.poll_interval = convert_interval(poll_interval) self.stopped = True @@ -472,21 +470,6 @@ def poll_kafka(self): try: while not self.stopped: out = [] - - latest_checkpoint = {} - if self.checkpointing is not None: - if not os.path.exists(self.checkpointing): - os.makedirs(self.checkpointing) - topic_path = self.checkpointing + '/' + self.topic - if not os.path.exists(topic_path): - os.makedirs(topic_path) - checkpoints_list = os.listdir(topic_path) - if len(checkpoints_list) > 0: - previous_checkpoint = max(checkpoints_list) - with open(topic_path + '/' + previous_checkpoint, 'r') as fr: - latest_checkpoint = json.loads(fr.readlines()[-1]) - fr.close() - for partition in range(self.npartitions): tp = ck.TopicPartition(self.topic, partition, 0) try: @@ -494,18 +477,11 @@ def poll_kafka(self): tp, timeout=0.1) except (RuntimeError, ck.KafkaException): continue - current_position = self.positions[partition] - group = self.consumer_params['group.id'] - - if group in latest_checkpoint.keys(): - if str(partition) in latest_checkpoint[group].keys(): - current_position = latest_checkpoint[group][str(partition)] - lowest = max(current_position, low) if high > lowest: out.append((self.consumer_params, self.topic, partition, - lowest, high - 1, self.checkpointing)) + lowest, high - 1)) self.positions[partition] = high for part in out: @@ -531,8 +507,7 @@ def start(self): @Stream.register_api(staticmethod) def from_kafka_batched(topic, consumer_params, poll_interval='1s', - npartitions=1, start=False, dask=False, - checkpointing=None, **kwargs): + npartitions=1, start=False, dask=False, **kwargs): """ Get messages from Kafka in batches Uses the confluent-kafka library, @@ -574,8 +549,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s', kwargs['loop'] = default_client().loop source = FromKafkaBatched(topic, consumer_params, poll_interval=poll_interval, - npartitions=npartitions, - checkpointing=checkpointing, **kwargs) + npartitions=npartitions, **kwargs) if dask: source = source.scatter() @@ -585,41 +559,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s', return source.starmap(get_message_batch) -def add_checkpoint(group, checkpoint, path): - topic = checkpoint.topic - partition = checkpoint.partition - offset = checkpoint.offset - latest_checkpoint = {} - previous_checkpoint = None - if not os.path.exists(path): - os.makedirs(path) - path = path + '/' + topic - if not os.path.exists(path): - os.makedirs(path) - checkpoints_list = os.listdir(path) - if len(checkpoints_list) > 0: - previous_checkpoint = max(checkpoints_list) - with open(path + '/' + previous_checkpoint, 'r') as fr: - latest_checkpoint = json.loads(fr.readlines()[0]) - fr.close() - #Only maintain the last 5 checkpoints - if len(checkpoints_list) == 5: - os.system('rm -rf ' + path + '/' + min(checkpoints_list)) - if group not in latest_checkpoint.keys(): - latest_checkpoint[group] = {} - latest_checkpoint[group][partition] = offset - print(latest_checkpoint) - if previous_checkpoint is None: - new_checkpoint = '1.txt' - else: - previous_batch = int(previous_checkpoint.split('.')[0]) - new_checkpoint = str(previous_batch + 1) + '.txt' - with open(path + '/' + new_checkpoint, 'a+') as fw: - fw.write(json.dumps(latest_checkpoint) + '\n') - fw.close() - - -def get_message_batch(kafka_params, topic, partition, low, high, checkpointing, timeout=None): +def get_message_batch(kafka_params, topic, partition, low, high, timeout=None): """Fetch a batch of kafka messages in given topic/partition This will block until messages are available, or timeout is reached. @@ -643,8 +583,5 @@ def get_message_batch(kafka_params, topic, partition, low, high, checkpointing, if timeout is not None and time.time() - t0 > timeout: break finally: - if checkpointing is not None: - checkpoint = consumer.commit(asynchronous=False) - add_checkpoint(kafka_params['group.id'], checkpoint[0], checkpointing) consumer.close() return out diff --git a/streamz/tests/test_kafka.py b/streamz/tests/test_kafka.py index daef25ab..d66fabad 100644 --- a/streamz/tests/test_kafka.py +++ b/streamz/tests/test_kafka.py @@ -6,7 +6,6 @@ import requests import shlex import subprocess -import time from tornado import gen from ..core import Stream @@ -201,88 +200,6 @@ def test_kafka_batch(): stream.upstream.stopped = True -def test_kafka_batch_checkpointing(): - bootstrap_servers = 'localhost:9092' - ARGS = {'bootstrap.servers': bootstrap_servers, - 'group.id': 'streamz-test'} - ARGS1 = {'bootstrap.servers': bootstrap_servers, - 'group.id': 'streamz-test'} - ARGS2 = {'bootstrap.servers': bootstrap_servers, - 'group.id': 'streamz-test'} - ARGS3 = {'bootstrap.servers': bootstrap_servers, - 'group.id': 'streamz-test'} - with kafka_service() as kafka: - kafka, TOPIC = kafka - for i in range(10): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - - stream = Stream.from_kafka_batched(TOPIC, ARGS, checkpointing='custreamz_checkpoints') - out = stream.sink_to_list() - stream.start() - wait_for(lambda: any(out) and out[-1][-1] == b'value-9', 10, period=0.2) - assert out[-1][-1] == b'value-9' - stream.upstream.stopped = True - - stream1 = Stream.from_kafka_batched(TOPIC, ARGS1, checkpointing=None) - out1 = stream1.sink_to_list() - stream1.start() - wait_for(lambda: any(out1) and out1[-1][-1] == b'value-9', 10, period=0.2) - assert out[-1][-1] == b'value-9' - stream1.upstream.stopped = True - - stream2 = Stream.from_kafka_batched(TOPIC, ARGS2, checkpointing='custreamz_checkpoints1') - out2 = stream2.sink_to_list() - stream2.start() - wait_for(lambda: any(out2) and out2[-1][-1] == b'value-9', 10, period=0.2) - assert out[-1][-1] == b'value-9' - stream2.upstream.stopped = True - - for i in range(10, 20): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - - stream3 = Stream.from_kafka_batched(TOPIC, ARGS3, checkpointing='custreamz_checkpoints') - out3 = stream3.sink_to_list() - stream3.start() - wait_for(lambda: any(out3) and out3[-1][0] == b'value-10' and out3[-1][-1] == b'value-19', 10, period=0.2) - - for i in range(20, 25): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - time.sleep(5) - checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC) - assert len(checkpoints_list) == 3 - - for i in range(25, 30): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - time.sleep(5) - checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC) - assert len(checkpoints_list) == 4 - - for i in range(30, 35): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - time.sleep(5) - checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC) - assert len(checkpoints_list) == 5 - - for i in range(35, 40): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - time.sleep(5) - checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC) - assert len(checkpoints_list) == 5 - - for i in range(40, 45): - kafka.produce(TOPIC, b'value-%d' % i) - kafka.flush() - time.sleep(5) - checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC) - assert len(checkpoints_list) == 5 - - @gen_cluster(client=True, timeout=60) def test_kafka_dask_batch(c, s, w1, w2): j = random.randint(0, 10000)