Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ install:
- conda update conda

# Install dependencies
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- source activate test-streamz

- python setup.py install
Expand Down
82 changes: 66 additions & 16 deletions streamz/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import, division, print_function

from collections import deque
from collections import deque, defaultdict
from datetime import timedelta
import functools
import logging
Expand Down Expand Up @@ -951,6 +951,19 @@ def _check_end(self):
class partition(Stream):
""" Partition stream into tuples of equal size

Parameters
----------
n: int
Maximum partition size
timeout: int or float, optional
Number of seconds after which a partition will be emitted,
even if its size is less than ``n``. If ``None`` (default),
a partition will be emitted only when its size reaches ``n``.
key: hashable or callable, optional
Emit items with the same key together as a separate partition.
If ``key`` is callable, partition will be identified by ``key(x)``,
otherwise by ``x[key]``. Defaults to ``None``.

Examples
--------
>>> source = Stream()
Expand All @@ -960,30 +973,67 @@ class partition(Stream):
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)

>>> source = Stream()
>>> source.partition(2, key=lambda x: x % 2).sink(print)
>>> for i in range(4):
... source.emit(i)
(0, 2)
(1, 3)

>>> from time import sleep
>>> source = Stream()
>>> source.partition(5, timeout=1).sink(print)
>>> for i in range(3):
... source.emit(i)
>>> sleep(1)
(0, 1, 2)
"""
_graphviz_shape = 'diamond'

def __init__(self, upstream, n, **kwargs):
def __init__(self, upstream, n, timeout=None, key=None, **kwargs):
self.n = n
self._buffer = []
self.metadata_buffer = []
Stream.__init__(self, upstream, **kwargs)
self._timeout = timeout
self._key = key
self._buffer = defaultdict(lambda: [])
self._metadata_buffer = defaultdict(lambda: [])
self._callbacks = {}
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)

def _get_key(self, x):
if self._key is None:
return None
if callable(self._key):
return self._key(x)
return x[self._key]

@gen.coroutine
def _flush(self, key):
result, self._buffer[key] = self._buffer[key], []
metadata_result, self._metadata_buffer[key] = self._metadata_buffer[key], []
yield self._emit(tuple(result), list(metadata_result))
self._release_refs(metadata_result)

@gen.coroutine
def update(self, x, who=None, metadata=None):
self._retain_refs(metadata)
self._buffer.append(x)
key = self._get_key(x)
buffer = self._buffer[key]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create new variables buffer and metadata_buffer here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @jsmaupin for review because this PR is changing how metadata is being handled in partition.

Copy link
Copy Markdown
Contributor Author

@roveo roveo Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create new variables buffer and metadata_buffer here?

We really don't have to, but since they're referenced multiple times, I figured it'd me more readable than self._metadata_buffer[key].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cc. The metadata changes look okay to me.

metadata_buffer = self._metadata_buffer[key]
buffer.append(x)
if isinstance(metadata, list):
self.metadata_buffer.extend(metadata)
metadata_buffer.extend(metadata)
else:
self.metadata_buffer.append(metadata)
if len(self._buffer) == self.n:
result, self._buffer = self._buffer, []
metadata_result, self.metadata_buffer = self.metadata_buffer, []
ret = self._emit(tuple(result), list(metadata_result))
self._release_refs(metadata_result)
return ret
else:
return []
metadata_buffer.append(metadata)
if len(buffer) == self.n:
if self._timeout is not None and self.n > 1:
self._callbacks[key].cancel()
yield self._flush(key)
return
if len(buffer) == 1 and self._timeout is not None:
self._callbacks[key] = self.loop.call_later(
self._timeout, self._flush, key
)


@Stream.register_api()
Expand Down
4 changes: 2 additions & 2 deletions streamz/dataframe/tests/test_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream):
out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]})
assert_eq(output1[-1][1].reset_index(), out_df1)


def test_dir(stream):
example = pd.DataFrame({'name': [], 'amount': []})
sdf = DataFrame(stream, example=example)
assert 'name' in dir(sdf)
assert 'amount' in dir(sdf)
assert 'amount' in dir(sdf)
59 changes: 59 additions & 0 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,65 @@ def test_partition():
assert L == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]


def test_partition_timeout():
source = Stream()
L = source.partition(10, timeout=0.01).sink_to_list()

for i in range(5):
source.emit(i)

sleep(0.1)

assert L == [(0, 1, 2, 3, 4)]


def test_partition_timeout_cancel():
source = Stream()
L = source.partition(3, timeout=0.1).sink_to_list()

for i in range(3):
source.emit(i)

sleep(0.09)
source.emit(3)
sleep(0.02)

assert L == [(0, 1, 2)]

sleep(0.09)

assert L == [(0, 1, 2), (3,)]


def test_partition_key():
source = Stream()
L = source.partition(2, key=0).sink_to_list()

for i in range(4):
source.emit((i % 2, i))

assert L == [((0, 0), (0, 2)), ((1, 1), (1, 3))]


def test_partition_key_callable():
source = Stream()
L = source.partition(2, key=lambda x: x % 2).sink_to_list()

for i in range(10):
source.emit(i)

assert L == [(0, 2), (1, 3), (4, 6), (5, 7)]


def test_partition_size_one():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the goal of this test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here:

https://github.com/roveo/streamz/blob/3747d8b37c909ff53b16f9a3723140698a0dd8a8/streamz/core.py#L1028-L1036

I didn't have self.n > 1 condition at first and self._callbacks[key].cancel() raised KeyError when n=1. So it's kind of an edge case test. If you think it's unnecessary, I'll remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we should add a check in partition (and update docstring) forcing n>1. If n=1, the user would be better off avoiding the use of partition altogether. What do you think @martindurant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of test and debug, I think it's OK to allow a redundant partition with n=1.

source = Stream()

source.partition(1, timeout=.01).sink(lambda x: None)

for i in range(10):
source.emit(i)


def test_sliding_window():
source = Stream()
L = source.sliding_window(2).sink_to_list()
Expand Down