From 98211e42902d57250dd28d17468a922a53d9144a Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Mon, 6 May 2019 21:08:18 -0400 Subject: [PATCH 1/6] FIX: connect doesn't break zip now --- streamz/core.py | 59 ++++++++++++++++++++++++++++++++------ streamz/tests/test_core.py | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 65b02066..d01fc050 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -181,6 +181,24 @@ def _inform_asynchronous(self, asynchronous): if downstream: downstream._inform_asynchronous(asynchronous) + def _add_upstream(self, upstream): + if self.upstreams == [None]: + self.upstreams = [upstream] + else: + self.upstreams.append(upstream) + + def _add_downstream(self, downstream): + self.downstreams.add(downstream) + + def _remove_downstream(self, downstream): + self.downstreams.remove(downstream) + + def _remove_upstream(self, upstream): + if len(self.upstreams) == 1: + self.upstreams = [None] + else: + self.upstreams.pop(self.upstreams.index(upstream)) + @classmethod def register_api(cls, modifier=identity): """ Add callable to Stream API @@ -349,12 +367,8 @@ def connect(self, downstream): downstream: Stream The downstream stream to connect to ''' - self.downstreams.add(downstream) - - if downstream.upstreams == [None]: - downstream.upstreams = [self] - else: - downstream.upstreams.append(self) + self._add_downstream(downstream) + downstream._add_upstream(self) def disconnect(self, downstream): ''' Disconnect this stream to a downstream element. @@ -364,9 +378,9 @@ def disconnect(self, downstream): downstream: Stream The downstream stream to disconnect from ''' - self.downstreams.remove(downstream) + self._remove_downstream(downstream) - downstream.upstreams.remove(self) + downstream._remove_upstream(self) @property def upstream(self): @@ -1013,6 +1027,14 @@ def __init__(self, *upstreams, **kwargs): Stream.__init__(self, upstreams=upstreams2, **kwargs) + def _add_upstream(self, upstream): + self.buffers[upstream] = deque() + super()._add_upstream(upstream) + + def _remove_upstream(self, upstream): + self.buffers.pop(upstream) + super()._remove_upstream(upstream) + def pack_literals(self, tup): """ Fill buffers for literals whenever we empty them """ inp = list(tup)[::-1] @@ -1077,6 +1099,27 @@ def __init__(self, *upstreams, **kwargs): self.emit_on = upstreams Stream.__init__(self, upstreams=upstreams, **kwargs) + def _add_upstream(self, upstream): + self.last.append(None) + self.missing.update([upstream]) + if self.emit_on != self.upstreams: + super()._add_upstream(upstream) + else: + super()._add_upstream(upstream) + self.emit_on = self.upstreams + + def _remove_upstream(self, upstream): + if self.emit_on == upstream: + raise RuntimeError("Can't remove the emit on upstream, consider" + "adding an emit on first") + self.last.pop(self.upstreams.index(upstream)) + self.missing.remove(upstream) + if self.emit_on == self.upstreams: + super()._remove_upstream(upstream) + self.emit_on = self.upstreams + else: + super()._remove_upstream(upstream) + def update(self, x, who=None): if self.missing and who in self.missing: self.missing.remove(who) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 1b07823b..061f1ffb 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -1207,5 +1207,59 @@ def start(self): assert flag == [True] +def test_connect_zip(): + a = Stream() + b = Stream() + c = Stream() + x = a.zip(b) + l = x.sink_to_list() + c.connect(x) + a.emit(1) + b.emit(1) + assert not l + c.emit(1) + assert l == [(1, 1, 1)] + + +def test_disconnect_zip(): + a = Stream() + b = Stream() + c = Stream() + x = a.zip(b, c) + l = x.sink_to_list() + b.disconnect(x) + a.emit(1) + b.emit(1) + assert not l + c.emit(1) + assert l == [(1, 1)] + + +def test_connect_combine_latest(): + a = Stream() + b = Stream() + c = Stream() + x = a.combine_latest(b, emit_on=a) + l = x.sink_to_list() + c.connect(x) + b.emit(1) + c.emit(1) + a.emit(1) + assert l == [(1, 1, 1)] + + +def test_connect_discombine_latest(): + a = Stream() + b = Stream() + c = Stream() + x = a.combine_latest(b, c, emit_on=a) + l = x.sink_to_list() + c.disconnect(x) + b.emit(1) + c.emit(1) + a.emit(1) + assert l == [(1, 1)] + + if sys.version_info >= (3, 5): from streamz.tests.py3_test_core import * # noqa From 33b05774702b9c7153ffbfcd571e43d7ca30abe8 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 08:29:16 -0400 Subject: [PATCH 2/6] flake8 --- streamz/tests/test_core.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 061f1ffb..e0c80f5c 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -1212,13 +1212,13 @@ def test_connect_zip(): b = Stream() c = Stream() x = a.zip(b) - l = x.sink_to_list() + L = x.sink_to_list() c.connect(x) a.emit(1) b.emit(1) assert not l c.emit(1) - assert l == [(1, 1, 1)] + assert L == [(1, 1, 1)] def test_disconnect_zip(): @@ -1226,13 +1226,13 @@ def test_disconnect_zip(): b = Stream() c = Stream() x = a.zip(b, c) - l = x.sink_to_list() + L = x.sink_to_list() b.disconnect(x) a.emit(1) b.emit(1) assert not l c.emit(1) - assert l == [(1, 1)] + assert L == [(1, 1)] def test_connect_combine_latest(): @@ -1240,12 +1240,12 @@ def test_connect_combine_latest(): b = Stream() c = Stream() x = a.combine_latest(b, emit_on=a) - l = x.sink_to_list() + L = x.sink_to_list() c.connect(x) b.emit(1) c.emit(1) a.emit(1) - assert l == [(1, 1, 1)] + assert L == [(1, 1, 1)] def test_connect_discombine_latest(): @@ -1253,12 +1253,12 @@ def test_connect_discombine_latest(): b = Stream() c = Stream() x = a.combine_latest(b, c, emit_on=a) - l = x.sink_to_list() + L = x.sink_to_list() c.disconnect(x) b.emit(1) c.emit(1) a.emit(1) - assert l == [(1, 1)] + assert L == [(1, 1)] if sys.version_info >= (3, 5): From 5a462b19244782ae9a4ca00e65e5515c961351da Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 09:05:45 -0400 Subject: [PATCH 3/6] fix errors from pep8 fix --- streamz/tests/test_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index e0c80f5c..f08e1417 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -1216,7 +1216,7 @@ def test_connect_zip(): c.connect(x) a.emit(1) b.emit(1) - assert not l + assert not L c.emit(1) assert L == [(1, 1, 1)] @@ -1230,7 +1230,7 @@ def test_disconnect_zip(): b.disconnect(x) a.emit(1) b.emit(1) - assert not l + assert not L c.emit(1) assert L == [(1, 1)] From aca1eac81397354a1662e2d77c68ebcd40ec57b4 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 09:30:28 -0400 Subject: [PATCH 4/6] py2k support --- streamz/core.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index d01fc050..1cd87c4b 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1029,11 +1029,11 @@ def __init__(self, *upstreams, **kwargs): def _add_upstream(self, upstream): self.buffers[upstream] = deque() - super()._add_upstream(upstream) + super(zip, self)._add_upstream(upstream) def _remove_upstream(self, upstream): self.buffers.pop(upstream) - super()._remove_upstream(upstream) + super(zip, self)._remove_upstream(upstream) def pack_literals(self, tup): """ Fill buffers for literals whenever we empty them """ @@ -1103,9 +1103,9 @@ def _add_upstream(self, upstream): self.last.append(None) self.missing.update([upstream]) if self.emit_on != self.upstreams: - super()._add_upstream(upstream) + super(combine_latest, self)._add_upstream(upstream) else: - super()._add_upstream(upstream) + super(combine_latest, self)._add_upstream(upstream) self.emit_on = self.upstreams def _remove_upstream(self, upstream): @@ -1115,10 +1115,10 @@ def _remove_upstream(self, upstream): self.last.pop(self.upstreams.index(upstream)) self.missing.remove(upstream) if self.emit_on == self.upstreams: - super()._remove_upstream(upstream) + super(combine_latest, self)._remove_upstream(upstream) self.emit_on = self.upstreams else: - super()._remove_upstream(upstream) + super(combine_latest, self)._remove_upstream(upstream) def update(self, x, who=None): if self.missing and who in self.missing: From 17bd225774bf988a65c93c446dad6e128840da69 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 11:06:51 -0400 Subject: [PATCH 5/6] add docs and fix buffered nodes --- streamz/core.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 1cd87c4b..8c6c74c9 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -182,22 +182,28 @@ def _inform_asynchronous(self, asynchronous): downstream._inform_asynchronous(asynchronous) def _add_upstream(self, upstream): + """Add upstream to current upstreams, this method is overridden for + classes which handle stream specific buffers/caches""" if self.upstreams == [None]: - self.upstreams = [upstream] + self.upstreams[0] = upstream else: self.upstreams.append(upstream) def _add_downstream(self, downstream): + """Add downstream to current downstreams""" self.downstreams.add(downstream) def _remove_downstream(self, downstream): + """Remove downstream from current downstreams""" self.downstreams.remove(downstream) def _remove_upstream(self, upstream): + """Remove upstream from current upstreams, this method is overridden for + classes which handle stream specific buffers/caches""" if len(self.upstreams) == 1: - self.upstreams = [None] + self.upstreams[0] = [None] else: - self.upstreams.pop(self.upstreams.index(upstream)) + self.upstreams.remove(upstream) @classmethod def register_api(cls, modifier=identity): @@ -1028,10 +1034,12 @@ def __init__(self, *upstreams, **kwargs): Stream.__init__(self, upstreams=upstreams2, **kwargs) def _add_upstream(self, upstream): + # Override method to handle setup of buffer for new stream self.buffers[upstream] = deque() super(zip, self)._add_upstream(upstream) def _remove_upstream(self, upstream): + # Override method to handle removal of buffer for stream self.buffers.pop(upstream) super(zip, self)._remove_upstream(upstream) @@ -1086,6 +1094,7 @@ class combine_latest(Stream): def __init__(self, *upstreams, **kwargs): emit_on = kwargs.pop('emit_on', None) + self._initial_emit_on = emit_on self.last = [None for _ in upstreams] self.missing = set(upstreams) @@ -1100,25 +1109,28 @@ def __init__(self, *upstreams, **kwargs): Stream.__init__(self, upstreams=upstreams, **kwargs) def _add_upstream(self, upstream): + # Override method to handle setup of last and missing for new stream self.last.append(None) self.missing.update([upstream]) - if self.emit_on != self.upstreams: - super(combine_latest, self)._add_upstream(upstream) - else: - super(combine_latest, self)._add_upstream(upstream) + super(combine_latest, self)._add_upstream(upstream) + if self._initial_emit_on is None: self.emit_on = self.upstreams def _remove_upstream(self, upstream): + # Override method to handle removal of last and missing for stream if self.emit_on == upstream: - raise RuntimeError("Can't remove the emit on upstream, consider" - "adding an emit on first") + raise RuntimeError("Can't remove the ``emit_on`` stream since that" + "would cause no data to be emitted. " + "Consider adding an ``emit_on`` first by " + "running ``node.emit_on=(upstream,)`` to add " + "a new ``emit_on`` or running " + "``node.emit_on=tuple(node.upstreams)`` to " + "emit on all incoming data") self.last.pop(self.upstreams.index(upstream)) self.missing.remove(upstream) - if self.emit_on == self.upstreams: - super(combine_latest, self)._remove_upstream(upstream) + super(combine_latest, self)._remove_upstream(upstream) + if self._initial_emit_on is None: self.emit_on = self.upstreams - else: - super(combine_latest, self)._remove_upstream(upstream) def update(self, x, who=None): if self.missing and who in self.missing: From b68c43096ac24c361a818eab316a5b742ffcbc06 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 12:22:12 -0400 Subject: [PATCH 6/6] use _remove_downstream for slice --- streamz/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streamz/core.py b/streamz/core.py index 8c6c74c9..fb055501 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -809,7 +809,8 @@ def update(self, x, who=None): def _check_end(self): if self.end and self.state >= self.end: # we're done - self.upstream.downstreams.remove(self) + for upstream in self.upstreams: + upstream._remove_downstream(self) @Stream.register_api()