diff --git a/docs/source/api.rst b/docs/source/api.rst
index fc9d9f6c..60e3fa88 100644
--- a/docs/source/api.rst
+++ b/docs/source/api.rst
@@ -25,6 +25,7 @@ Stream
rate_limit
scatter
sink
+ slice
sliding_window
starmap
timed_window
@@ -40,6 +41,7 @@ Sources
.. autosummary::
filenames
from_kafka
+ from_process
from_textfile
from_socket
diff --git a/examples/fib_asyncio.py b/examples/fib_asyncio.py
index 091837b2..376e46f8 100644
--- a/examples/fib_asyncio.py
+++ b/examples/fib_asyncio.py
@@ -24,4 +24,5 @@ def run_asyncio_loop():
finally:
loop.close()
+
run_asyncio_loop()
diff --git a/streamz/collection.py b/streamz/collection.py
index 9298d7d3..4990d93b 100644
--- a/streamz/collection.py
+++ b/streamz/collection.py
@@ -229,7 +229,8 @@ def _repr_html_(self):
return "
%s - elements like\n%s" % (type(self).__name__, body)
def _ipython_display_(self, **kwargs):
- return self.stream.latest().rate_limit(0.5).gather()._ipython_display_(**kwargs)
+ return self.stream.latest().rate_limit(
+ 0.5).gather()._ipython_display_(**kwargs)
def emit(self, x):
self.verify(x)
diff --git a/streamz/core.py b/streamz/core.py
index e22eec70..8fdabe9b 100644
--- a/streamz/core.py
+++ b/streamz/core.py
@@ -652,13 +652,19 @@ class accumulate(Stream):
This performs running or cumulative reductions, applying the function
to the previous total and the new element. The function should take
two arguments, the previous accumulated state and the next element and
- it should return a new accumulated state.
+ it should return a new accumulated state,
+ - ``state = func(previous_state, new_value)`` (returns_state=False)
+ - ``state, result = func(previous_state, new_value)`` (returns_state=True)
+
+ where the new_state is passed to the next invocation. The state or result
+ is emitted downstream for the two cases.
Parameters
----------
func: callable
start: object
- Initial value. Defaults to the first submitted element
+ Initial value, passed as the value of ``previous_state`` on the first
+ invocation. Defaults to the first submitted element
returns_state: boolean
If true then func should return both the state and the value to emit
If false then both values are the same, and func returns one value
@@ -667,14 +673,41 @@ class accumulate(Stream):
Examples
--------
+ A running total, producing triangular numbers
+
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
... source.emit(i)
+ 0
1
3
6
10
+
+ A count of number of events (including the current one)
+
+ >>> source = Stream()
+ >>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
+ >>> for _ in range(5):
+ ... source.emit(0)
+ 1
+ 2
+ 3
+ 4
+ 5
+
+ Like the builtin "enumerate".
+
+ >>> source = Stream()
+ >>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
+ ... start=(0, 0), returns_state=True
+ ... ).sink(print)
+ >>> for i in range(3):
+ ... source.emit(0)
+ (0, 0)
+ (1, 0)
+ (2, 0)
"""
_graphviz_shape = 'box'
@@ -706,6 +739,54 @@ def update(self, x, who=None):
return self._emit(result)
+@Stream.register_api()
+class slice(Stream):
+ """
+ Get only some events in a stream by position. Works like list[] syntax.
+
+ Parameters
+ ----------
+ start : int
+ First event to use. If None, start from the beginnning
+ end : int
+ Last event to use (non-inclusive). If None, continue without stopping.
+ Does not support negative indexing.
+ step : int
+ Pass on every Nth event. If None, pass every one.
+
+ Examples
+ --------
+ >>> source = Stream()
+ >>> source.slice(2, 6, 2).sink(print)
+ >>> for i in range(5):
+ ... source.emit(0)
+ 2
+ 4
+ """
+
+ def __init__(self, upstream, start=None, end=None, step=None, **kwargs):
+ self.state = 0
+ self.star = start or 0
+ self.end = end
+ self.step = step or 1
+ if any((_ or 0) < 0 for _ in [start, end, step]):
+ raise ValueError("Negative indices not supported by slice")
+ stream_name = kwargs.pop('stream_name', None)
+ Stream.__init__(self, upstream, stream_name=stream_name)
+ self._check_end()
+
+ def update(self, x, who=None):
+ if self.state >= self.star and self.state % self.step == 0:
+ self.emit(x)
+ self.state += 1
+ self._check_end()
+
+ def _check_end(self):
+ if self.end and self.state >= self.end:
+ # we're done
+ self.upstream.downstreams.remove(self)
+
+
@Stream.register_api()
class partition(Stream):
""" Partition stream into tuples of equal size
@@ -740,10 +821,17 @@ def update(self, x, who=None):
class sliding_window(Stream):
""" Produce overlapping tuples of size n
+ Parameters
+ ----------
+ return_partial : bool
+ If True, yield tuples as soon as any events come in, each tuple being
+ smaller or equal to the window size. If False, only start yielding
+ tuples once a full window has accrued.
+
Examples
--------
>>> source = Stream()
- >>> source.sliding_window(3).sink(print)
+ >>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
... source.emit(i)
(0, 1, 2)
@@ -755,14 +843,15 @@ class sliding_window(Stream):
"""
_graphviz_shape = 'diamond'
- def __init__(self, upstream, n, **kwargs):
+ def __init__(self, upstream, n, return_partial=True, **kwargs):
self.n = n
self.buffer = deque(maxlen=n)
+ self.partial = return_partial
Stream.__init__(self, upstream, **kwargs)
def update(self, x, who=None):
self.buffer.append(x)
- if len(self.buffer) == self.n:
+ if self.partial or len(self.buffer) == self.n:
return self._emit(tuple(self.buffer))
else:
return []
diff --git a/streamz/sources.py b/streamz/sources.py
index 458f4933..88efeff9 100644
--- a/streamz/sources.py
+++ b/streamz/sources.py
@@ -35,6 +35,15 @@ def write(text):
class Source(Stream):
_graphviz_shape = 'doubleoctagon'
+ def __init__(self, **kwargs):
+ self.stopped = True
+ super(Source, self).__init__(**kwargs)
+
+ def stop(self): # pragma: no cover
+ # fallback stop method - for poll functions with while not self.stopped
+ if not self.stopped:
+ self.stopped = True
+
@Stream.register_api(staticmethod)
class from_textfile(Source):
@@ -291,6 +300,71 @@ def stop(self):
self.stopped = True
+@Stream.register_api(staticmethod)
+class from_process(Source):
+ """Messages from a running external process
+
+ This doesn't work on Windows
+
+ Parameters
+ ----------
+ cmd : list of str or str
+ Command to run: program name, followed by arguments
+ open_kwargs : dict
+ To pass on the the process open function, see ``subprocess.Popen``.
+ with_stderr : bool
+ Whether to include the process STDERR in the stream
+ start : bool
+ Whether to immediately startup the process. Usually you want to connect
+ downstream nodes first, and then call ``.start()``.
+
+ Example
+ -------
+ >>> source = Source.from_process(['ping', 'localhost']) # doctest: +SKIP
+ """
+
+ def __init__(self, cmd, open_kwargs=None, with_stderr=False, start=False):
+ self.cmd = cmd
+ self.open_kwargs = open_kwargs or {}
+ self.with_stderr = with_stderr
+ super(from_process, self).__init__(ensure_io_loop=True)
+ self.stopped = True
+ self.process = None
+ if start: # pragma: no cover
+ self.start()
+
+ @gen.coroutine
+ def _start_process(self):
+ # should be done in asyncio (py3 only)? Apparently can handle Windows
+ # with appropriate config.
+ from tornado.process import Subprocess
+ from tornado.iostream import StreamClosedError
+ import subprocess
+ stderr = subprocess.STDOUT if self.with_stderr else subprocess.PIPE
+ process = Subprocess(self.cmd, stdout=Subprocess.STREAM,
+ stderr=stderr, **self.open_kwargs)
+ while not self.stopped:
+ try:
+ out = yield process.stdout.read_until(b'\n')
+ except StreamClosedError:
+ # process exited
+ break
+ yield self._emit(out)
+ yield process.stdout.close()
+ process.proc.terminate()
+
+ def start(self):
+ """Start external process"""
+ if self.stopped:
+ self.loop.add_callback(self._start_process)
+ self.stopped = False
+
+ def stop(self):
+ """Shutdown external process"""
+ if not self.stopped:
+ self.stopped = True
+
+
@Stream.register_api(staticmethod)
class from_kafka(Source):
""" Accepts messages from Kafka
diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py
index 3845de60..ef8d05d6 100644
--- a/streamz/tests/test_core.py
+++ b/streamz/tests/test_core.py
@@ -148,6 +148,14 @@ def test_sliding_window():
source = Stream()
L = source.sliding_window(2).sink_to_list()
+ for i in range(10):
+ source.emit(i)
+
+ assert L == [(0, ), (0, 1), (1, 2), (2, 3), (3, 4), (4, 5),
+ (5, 6), (6, 7), (7, 8), (8, 9)]
+
+ L = source.sliding_window(2, return_partial=False).sink_to_list()
+
for i in range(10):
source.emit(i)
@@ -1099,6 +1107,31 @@ def test_share_common_ioloop(clean): # noqa: F811
assert aa.loop is bb.loop
+@pytest.mark.parametrize('data', [
+ [[], [0, 1, 2, 3, 4, 5]],
+ [[None, None, None], [0, 1, 2, 3, 4, 5]],
+ [[1, None, None], [1, 2, 3, 4, 5]],
+ [[None, 4, None], [0, 1, 2, 3]],
+ [[None, 4, 2], [0, 2]],
+ [[3, 1, None], []]
+
+])
+def test_slice(data):
+ pars, expected = data
+ a = Stream()
+ b = a.slice(*pars)
+ out = b.sink_to_list()
+ for i in range(6):
+ a.emit(i)
+ assert out == expected
+
+
+def test_slice_err():
+ a = Stream()
+ with pytest.raises(ValueError):
+ a.slice(end=-1)
+
+
def test_start():
flag = []
diff --git a/streamz/tests/test_sources.py b/streamz/tests/test_sources.py
index f51dc38a..c099a69f 100644
--- a/streamz/tests/test_sources.py
+++ b/streamz/tests/test_sources.py
@@ -81,3 +81,13 @@ def test_http():
with pytest.raises(requests.exceptions.RequestException):
requests.post('http://localhost:%i/other' % port, data=b'data2')
+
+
+@gen_test(timeout=60)
+def test_process():
+ cmd = ["python", "-c", "for i in range(4): print(i)"]
+ s = Source.from_process(cmd)
+ out = s.sink_to_list()
+ s.start()
+ yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
+ s.stop()