From f81489bf23fab1273f5fe3242a6cdee8471bfc84 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Wed, 1 May 2019 15:57:08 -0400 Subject: [PATCH 1/6] allow non hashable data to be used in unique --- streamz/core.py | 36 ++++++++++++++++++++++++++++++------ streamz/tests/test_core.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index e22eec70..84fc0bf6 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1,6 +1,6 @@ from __future__ import absolute_import, division, print_function -from collections import deque +from collections import deque, Hashable from datetime import timedelta import functools import logging @@ -1035,6 +1035,15 @@ class unique(Stream): parameter. For example setting ``history=1`` avoids sending through elements when one is repeated right after the other. + Parameters + ---------- + history : int or None, optional + number of stored unique values to check against + key : function, optional + Function which returns a representation of the incoming data. + For example ``key=lambda x: x['a']`` could be used to allow only + pieces of data with unique ``'a'`` values to pass through. + Examples -------- >>> source = Stream() @@ -1047,18 +1056,33 @@ class unique(Stream): 3 """ def __init__(self, upstream, history=None, key=identity, **kwargs): - self.seen = dict() + self.seen = None self.key = key - if history: - from zict import LRU - self.seen = LRU(history, self.seen) + self.history = history Stream.__init__(self, upstream, **kwargs) def update(self, x, who=None): y = self.key(x) + # If this is the first piece of data make the cache + if self.seen is None: + if isinstance(y, Hashable): + self.seen = dict() + if self.history: + # if it is hashable use LRU cache + if isinstance(y, Hashable): + from zict import LRU + self.seen = LRU(self.history, self.seen) + # if not hashable use deque (since it doesn't need a hash) + else: + self.seen = deque(maxlen=self.history) + if y not in self.seen: - self.seen[y] = 1 + # LRU and deque have slightly different syntax + if isinstance(self.seen, deque): + self.seen.append(y) + else: + self.seen[y] = 1 return self._emit(x) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 3845de60..4f4fae70 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -522,6 +522,34 @@ def test_unique_history(): assert L == [1, 2, 3, 1] +def test_unique_history_dict(): + source = Stream() + s = source.unique(history=2) + L = s.sink_to_list() + + a = {'hi': 'world'} + b = {'hi': 'bar'} + c = {'foo': 'bar'} + + source.emit(a) + source.emit(b) + source.emit(a) + source.emit(b) + source.emit(a) + source.emit(b) + + assert L == [a, b] + + source.emit(c) + source.emit(b) + + assert L == [a, b, c] + + source.emit(a) + + assert L == [a, b, c, a] + + def test_union(): a = Stream() b = Stream() From 23019913915e1ed84828bcca44be223aedc4e12d Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 12:43:01 -0400 Subject: [PATCH 2/6] use hashable kwarg --- streamz/core.py | 31 +++++++++++++++---------------- streamz/tests/test_core.py | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 84fc0bf6..885feab2 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1043,6 +1043,11 @@ class unique(Stream): Function which returns a representation of the incoming data. For example ``key=lambda x: x['a']`` could be used to allow only pieces of data with unique ``'a'`` values to pass through. + hashable : bool, optional + If True then data is assumed to be hashable, else it is not. This is + used for determining how to cache the history, if hashable then + either dicts or LRU caches are used, otherwise a deque is used. + Defaults to True. Examples -------- @@ -1055,30 +1060,24 @@ class unique(Stream): 1 3 """ - def __init__(self, upstream, history=None, key=identity, **kwargs): - self.seen = None + def __init__(self, upstream, history=None, key=identity, hashable=True, + **kwargs): self.key = key self.history = history + if hashable: + self.seen = dict() + if self.history: + from zict import LRU + self.seen = LRU(self.history, self.seen) + else: + self.seen = deque(maxlen=history) Stream.__init__(self, upstream, **kwargs) def update(self, x, who=None): y = self.key(x) - # If this is the first piece of data make the cache - if self.seen is None: - if isinstance(y, Hashable): - self.seen = dict() - if self.history: - # if it is hashable use LRU cache - if isinstance(y, Hashable): - from zict import LRU - self.seen = LRU(self.history, self.seen) - # if not hashable use deque (since it doesn't need a hash) - else: - self.seen = deque(maxlen=self.history) - if y not in self.seen: - # LRU and deque have slightly different syntax + # LRU/dict and deque have slightly different syntax if isinstance(self.seen, deque): self.seen.append(y) else: diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 4f4fae70..b7229d3b 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -524,7 +524,7 @@ def test_unique_history(): def test_unique_history_dict(): source = Stream() - s = source.unique(history=2) + s = source.unique(history=2, hashable=False) L = s.sink_to_list() a = {'hi': 'world'} From 1f425f38f0abf568d34160610912579b828afd78 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Tue, 7 May 2019 13:07:10 -0400 Subject: [PATCH 3/6] remove unused import --- streamz/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamz/core.py b/streamz/core.py index 885feab2..36a12b7e 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1,6 +1,6 @@ from __future__ import absolute_import, division, print_function -from collections import deque, Hashable +from collections import deque from datetime import timedelta import functools import logging From cb328f28b2bcb208d9ef464fb2499490476010bb Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Thu, 30 May 2019 12:39:46 -0400 Subject: [PATCH 4/6] add history test and change history to maxsize --- streamz/core.py | 12 ++++++------ streamz/tests/test_core.py | 19 ++++++++++++++++--- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 36a12b7e..b31e4025 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1060,17 +1060,17 @@ class unique(Stream): 1 3 """ - def __init__(self, upstream, history=None, key=identity, hashable=True, + def __init__(self, upstream, maxsize=None, key=identity, hashable=True, **kwargs): self.key = key - self.history = history + self.maxsize = maxsize if hashable: self.seen = dict() - if self.history: + if self.maxsize: from zict import LRU - self.seen = LRU(self.history, self.seen) + self.seen = LRU(self.maxsize, self.seen) else: - self.seen = deque(maxlen=history) + self.seen = deque(maxlen=maxsize) Stream.__init__(self, upstream, **kwargs) @@ -1079,7 +1079,7 @@ def update(self, x, who=None): if y not in self.seen: # LRU/dict and deque have slightly different syntax if isinstance(self.seen, deque): - self.seen.append(y) + self.seen.appendleft(y) else: self.seen[y] = 1 return self._emit(x) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index b7229d3b..c9d412fe 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -487,7 +487,7 @@ def test_unique(): def test_unique_key(): source = Stream() - L = source.unique(key=lambda x: x % 2, history=1).sink_to_list() + L = source.unique(key=lambda x: x % 2, maxsize=1).sink_to_list() source.emit(1) source.emit(2) @@ -500,31 +500,44 @@ def test_unique_key(): def test_unique_history(): source = Stream() - s = source.unique(history=2) + s = source.unique(maxsize=2) + s2 = source.unique(maxsize=2, hashable=False) L = s.sink_to_list() + L2 = s2.sink_to_list() source.emit(1) source.emit(2) source.emit(1) + source.emit(1) + source.emit(1) source.emit(2) source.emit(1) source.emit(2) assert L == [1, 2] + assert L == L2 source.emit(3) source.emit(2) assert L == [1, 2, 3] + assert L == L2 source.emit(1) assert L == [1, 2, 3, 1] + assert L == L2 + + source.emit(3) + source.emit(2) + source.emit(3) + + assert L == L2 def test_unique_history_dict(): source = Stream() - s = source.unique(history=2, hashable=False) + s = source.unique(maxsize=2, hashable=False) L = s.sink_to_list() a = {'hi': 'world'} From 8c5679f868a9496993980c6f4d5e558edfffdfcd Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Fri, 31 May 2019 10:19:34 -0400 Subject: [PATCH 5/6] handle unique maxsize properly --- streamz/core.py | 21 ++++++++++++++------- streamz/tests/test_core.py | 8 +++++--- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index b31e4025..305f62d4 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1070,19 +1070,26 @@ def __init__(self, upstream, maxsize=None, key=identity, hashable=True, from zict import LRU self.seen = LRU(self.maxsize, self.seen) else: - self.seen = deque(maxlen=maxsize) + self.seen = [] Stream.__init__(self, upstream, **kwargs) def update(self, x, who=None): y = self.key(x) - if y not in self.seen: - # LRU/dict and deque have slightly different syntax - if isinstance(self.seen, deque): - self.seen.appendleft(y) - else: + emit = True + if isinstance(self.seen, list): + if y in self.seen: + self.seen.remove(y) + emit = False + self.seen.insert(0, y) + if self.maxsize: + del self.seen[self.maxsize:] + if emit: + return self._emit(x) + else: + if self.seen.get(y, '~~not_seen~~') == '~~not_seen~~': self.seen[y] = 1 - return self._emit(x) + return self._emit(x) @Stream.register_api() diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index c9d412fe..34c69af6 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -508,8 +508,6 @@ def test_unique_history(): source.emit(1) source.emit(2) source.emit(1) - source.emit(1) - source.emit(1) source.emit(2) source.emit(1) source.emit(2) @@ -528,10 +526,14 @@ def test_unique_history(): assert L == [1, 2, 3, 1] assert L == L2 - source.emit(3) + # update 2 position source.emit(2) + # knock out 1 source.emit(3) + # update 2 position + source.emit(2) + assert L == [1, 2, 3, 1, 3] assert L == L2 From ba3c16435a8059d4d2b83cc7e8eeeff3dafe2285 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Fri, 31 May 2019 16:26:54 -0400 Subject: [PATCH 6/6] py3k only --- .travis.yml | 1 - setup.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 37b56e6a..9dd069c7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,6 @@ language: python matrix: include: - - python: 2.7 - python: 3.6 env: diff --git a/setup.py b/setup.py index 7bf32f88..dd1a6c42 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ license='BSD', keywords='streams', packages=packages + tests, + python_requires='>=3.5', long_description=(open('README.rst').read() if exists('README.rst') else ''), install_requires=list(open('requirements.txt').read().strip().split('\n')),