diff --git a/.travis.yml b/.travis.yml index 37b56e6a..9dd069c7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,6 @@ language: python matrix: include: - - python: 2.7 - python: 3.6 env: diff --git a/setup.py b/setup.py index 7bf32f88..dd1a6c42 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ license='BSD', keywords='streams', packages=packages + tests, + python_requires='>=3.5', long_description=(open('README.rst').read() if exists('README.rst') else ''), install_requires=list(open('requirements.txt').read().strip().split('\n')), diff --git a/streamz/core.py b/streamz/core.py index e22eec70..305f62d4 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1035,6 +1035,20 @@ class unique(Stream): parameter. For example setting ``history=1`` avoids sending through elements when one is repeated right after the other. + Parameters + ---------- + history : int or None, optional + number of stored unique values to check against + key : function, optional + Function which returns a representation of the incoming data. + For example ``key=lambda x: x['a']`` could be used to allow only + pieces of data with unique ``'a'`` values to pass through. + hashable : bool, optional + If True then data is assumed to be hashable, else it is not. This is + used for determining how to cache the history, if hashable then + either dicts or LRU caches are used, otherwise a deque is used. + Defaults to True. + Examples -------- >>> source = Stream() @@ -1046,20 +1060,36 @@ class unique(Stream): 1 3 """ - def __init__(self, upstream, history=None, key=identity, **kwargs): - self.seen = dict() + def __init__(self, upstream, maxsize=None, key=identity, hashable=True, + **kwargs): self.key = key - if history: - from zict import LRU - self.seen = LRU(history, self.seen) + self.maxsize = maxsize + if hashable: + self.seen = dict() + if self.maxsize: + from zict import LRU + self.seen = LRU(self.maxsize, self.seen) + else: + self.seen = [] Stream.__init__(self, upstream, **kwargs) def update(self, x, who=None): y = self.key(x) - if y not in self.seen: - self.seen[y] = 1 - return self._emit(x) + emit = True + if isinstance(self.seen, list): + if y in self.seen: + self.seen.remove(y) + emit = False + self.seen.insert(0, y) + if self.maxsize: + del self.seen[self.maxsize:] + if emit: + return self._emit(x) + else: + if self.seen.get(y, '~~not_seen~~') == '~~not_seen~~': + self.seen[y] = 1 + return self._emit(x) @Stream.register_api() diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 3845de60..34c69af6 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -487,7 +487,7 @@ def test_unique(): def test_unique_key(): source = Stream() - L = source.unique(key=lambda x: x % 2, history=1).sink_to_list() + L = source.unique(key=lambda x: x % 2, maxsize=1).sink_to_list() source.emit(1) source.emit(2) @@ -500,8 +500,10 @@ def test_unique_key(): def test_unique_history(): source = Stream() - s = source.unique(history=2) + s = source.unique(maxsize=2) + s2 = source.unique(maxsize=2, hashable=False) L = s.sink_to_list() + L2 = s2.sink_to_list() source.emit(1) source.emit(2) @@ -511,15 +513,56 @@ def test_unique_history(): source.emit(2) assert L == [1, 2] + assert L == L2 source.emit(3) source.emit(2) assert L == [1, 2, 3] + assert L == L2 source.emit(1) assert L == [1, 2, 3, 1] + assert L == L2 + + # update 2 position + source.emit(2) + # knock out 1 + source.emit(3) + # update 2 position + source.emit(2) + + assert L == [1, 2, 3, 1, 3] + assert L == L2 + + +def test_unique_history_dict(): + source = Stream() + s = source.unique(maxsize=2, hashable=False) + L = s.sink_to_list() + + a = {'hi': 'world'} + b = {'hi': 'bar'} + c = {'foo': 'bar'} + + source.emit(a) + source.emit(b) + source.emit(a) + source.emit(b) + source.emit(a) + source.emit(b) + + assert L == [a, b] + + source.emit(c) + source.emit(b) + + assert L == [a, b, c] + + source.emit(a) + + assert L == [a, b, c, a] def test_union():