Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Stream
rate_limit
scatter
sink
slice
sliding_window
starmap
timed_window
Expand All @@ -40,6 +41,7 @@ Sources
.. autosummary::
filenames
from_kafka
from_process
from_textfile
from_socket

Expand Down
1 change: 1 addition & 0 deletions examples/fib_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ def run_asyncio_loop():
finally:
loop.close()


run_asyncio_loop()
3 changes: 2 additions & 1 deletion streamz/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ def _repr_html_(self):
return "<h5>%s - elements like<h5>\n%s" % (type(self).__name__, body)

def _ipython_display_(self, **kwargs):
return self.stream.latest().rate_limit(0.5).gather()._ipython_display_(**kwargs)
return self.stream.latest().rate_limit(
0.5).gather()._ipython_display_(**kwargs)

def emit(self, x):
self.verify(x)
Expand Down
99 changes: 94 additions & 5 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,13 +652,19 @@ class accumulate(Stream):
This performs running or cumulative reductions, applying the function
to the previous total and the new element. The function should take
two arguments, the previous accumulated state and the next element and
it should return a new accumulated state.
it should return a new accumulated state,
- ``state = func(previous_state, new_value)`` (returns_state=False)
- ``state, result = func(previous_state, new_value)`` (returns_state=True)

where the new_state is passed to the next invocation. The state or result
is emitted downstream for the two cases.

Parameters
----------
func: callable
start: object
Initial value. Defaults to the first submitted element
Initial value, passed as the value of ``previous_state`` on the first
invocation. Defaults to the first submitted element
returns_state: boolean
If true then func should return both the state and the value to emit
If false then both values are the same, and func returns one value
Expand All @@ -667,14 +673,41 @@ class accumulate(Stream):

Examples
--------
A running total, producing triangular numbers

>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
... source.emit(i)
0
1
3
6
10

A count of number of events (including the current one)

>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
>>> for _ in range(5):
... source.emit(0)
1
2
3
4
5

Like the builtin "enumerate".

>>> source = Stream()
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
... start=(0, 0), returns_state=True
... ).sink(print)
>>> for i in range(3):
... source.emit(0)
(0, 0)
(1, 0)
(2, 0)
"""
_graphviz_shape = 'box'

Expand Down Expand Up @@ -706,6 +739,54 @@ def update(self, x, who=None):
return self._emit(result)


@Stream.register_api()
class slice(Stream):
"""
Get only some events in a stream by position. Works like list[] syntax.

Parameters
----------
start : int
First event to use. If None, start from the beginnning
end : int
Last event to use (non-inclusive). If None, continue without stopping.
Does not support negative indexing.
step : int
Pass on every Nth event. If None, pass every one.

Examples
--------
>>> source = Stream()
>>> source.slice(2, 6, 2).sink(print)
>>> for i in range(5):
... source.emit(0)
2
4
"""

def __init__(self, upstream, start=None, end=None, step=None, **kwargs):
self.state = 0
self.star = start or 0
self.end = end
self.step = step or 1
if any((_ or 0) < 0 for _ in [start, end, step]):
raise ValueError("Negative indices not supported by slice")
stream_name = kwargs.pop('stream_name', None)
Stream.__init__(self, upstream, stream_name=stream_name)
self._check_end()

def update(self, x, who=None):
if self.state >= self.star and self.state % self.step == 0:
self.emit(x)
self.state += 1
self._check_end()

def _check_end(self):
if self.end and self.state >= self.end:
# we're done
self.upstream.downstreams.remove(self)


@Stream.register_api()
class partition(Stream):
""" Partition stream into tuples of equal size
Expand Down Expand Up @@ -740,10 +821,17 @@ def update(self, x, who=None):
class sliding_window(Stream):
""" Produce overlapping tuples of size n

Parameters
----------
return_partial : bool
If True, yield tuples as soon as any events come in, each tuple being
smaller or equal to the window size. If False, only start yielding
tuples once a full window has accrued.

Examples
--------
>>> source = Stream()
>>> source.sliding_window(3).sink(print)
>>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
... source.emit(i)
(0, 1, 2)
Expand All @@ -755,14 +843,15 @@ class sliding_window(Stream):
"""
_graphviz_shape = 'diamond'

def __init__(self, upstream, n, **kwargs):
def __init__(self, upstream, n, return_partial=True, **kwargs):
self.n = n
self.buffer = deque(maxlen=n)
self.partial = return_partial
Stream.__init__(self, upstream, **kwargs)

def update(self, x, who=None):
self.buffer.append(x)
if len(self.buffer) == self.n:
if self.partial or len(self.buffer) == self.n:
return self._emit(tuple(self.buffer))
else:
return []
Expand Down
74 changes: 74 additions & 0 deletions streamz/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def write(text):
class Source(Stream):
_graphviz_shape = 'doubleoctagon'

def __init__(self, **kwargs):
self.stopped = True
super(Source, self).__init__(**kwargs)

def stop(self): # pragma: no cover
# fallback stop method - for poll functions with while not self.stopped
if not self.stopped:
self.stopped = True


@Stream.register_api(staticmethod)
class from_textfile(Source):
Expand Down Expand Up @@ -291,6 +300,71 @@ def stop(self):
self.stopped = True


@Stream.register_api(staticmethod)
class from_process(Source):
"""Messages from a running external process

This doesn't work on Windows

Parameters
----------
cmd : list of str or str
Command to run: program name, followed by arguments
open_kwargs : dict
To pass on the the process open function, see ``subprocess.Popen``.
with_stderr : bool
Whether to include the process STDERR in the stream
start : bool
Whether to immediately startup the process. Usually you want to connect
downstream nodes first, and then call ``.start()``.

Example
-------
>>> source = Source.from_process(['ping', 'localhost']) # doctest: +SKIP
"""

def __init__(self, cmd, open_kwargs=None, with_stderr=False, start=False):
self.cmd = cmd
self.open_kwargs = open_kwargs or {}
self.with_stderr = with_stderr
super(from_process, self).__init__(ensure_io_loop=True)
self.stopped = True
self.process = None
if start: # pragma: no cover
self.start()

@gen.coroutine
def _start_process(self):
# should be done in asyncio (py3 only)? Apparently can handle Windows
# with appropriate config.
from tornado.process import Subprocess
from tornado.iostream import StreamClosedError
import subprocess
stderr = subprocess.STDOUT if self.with_stderr else subprocess.PIPE
process = Subprocess(self.cmd, stdout=Subprocess.STREAM,
stderr=stderr, **self.open_kwargs)
while not self.stopped:
try:
out = yield process.stdout.read_until(b'\n')
except StreamClosedError:
# process exited
break
yield self._emit(out)
yield process.stdout.close()
process.proc.terminate()

def start(self):
"""Start external process"""
if self.stopped:
self.loop.add_callback(self._start_process)
self.stopped = False

def stop(self):
"""Shutdown external process"""
if not self.stopped:
self.stopped = True


@Stream.register_api(staticmethod)
class from_kafka(Source):
""" Accepts messages from Kafka
Expand Down
33 changes: 33 additions & 0 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def test_sliding_window():
source = Stream()
L = source.sliding_window(2).sink_to_list()

for i in range(10):
source.emit(i)

assert L == [(0, ), (0, 1), (1, 2), (2, 3), (3, 4), (4, 5),
(5, 6), (6, 7), (7, 8), (8, 9)]

L = source.sliding_window(2, return_partial=False).sink_to_list()

for i in range(10):
source.emit(i)

Expand Down Expand Up @@ -1099,6 +1107,31 @@ def test_share_common_ioloop(clean): # noqa: F811
assert aa.loop is bb.loop


@pytest.mark.parametrize('data', [
[[], [0, 1, 2, 3, 4, 5]],
[[None, None, None], [0, 1, 2, 3, 4, 5]],
[[1, None, None], [1, 2, 3, 4, 5]],
[[None, 4, None], [0, 1, 2, 3]],
[[None, 4, 2], [0, 2]],
[[3, 1, None], []]

])
def test_slice(data):
pars, expected = data
a = Stream()
b = a.slice(*pars)
out = b.sink_to_list()
for i in range(6):
a.emit(i)
assert out == expected


def test_slice_err():
a = Stream()
with pytest.raises(ValueError):
a.slice(end=-1)


def test_start():
flag = []

Expand Down
10 changes: 10 additions & 0 deletions streamz/tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,13 @@ def test_http():

with pytest.raises(requests.exceptions.RequestException):
requests.post('http://localhost:%i/other' % port, data=b'data2')


@gen_test(timeout=60)
def test_process():
cmd = ["python", "-c", "for i in range(4): print(i)"]
s = Source.from_process(cmd)
out = s.sink_to_list()
s.start()
yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
s.stop()