Skip to content

Commit

Permalink
Merge pull request #463 from ml31415/flatten
Browse files Browse the repository at this point in the history
Fix #462 flattening of iterators without len
  • Loading branch information
martindurant committed Dec 22, 2022
2 parents b3812a6 + 0edfa40 commit b4f0450
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
16 changes: 11 additions & 5 deletions streamz/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections import deque, defaultdict
from datetime import timedelta
from itertools import chain
import functools
import logging
import six
Expand Down Expand Up @@ -1637,15 +1638,20 @@ class flatten(Stream):
"""
def update(self, x, who=None, metadata=None):
L = []
for i, item in enumerate(x):
if i == len(x) - 1:
y = self._emit(item, metadata=metadata)
else:
y = self._emit(item)
items = chain(x)
item = next(items)
for item_next in items:
y = self._emit(item)
item = item_next
if type(y) is list:
L.extend(y)
else:
L.append(y)
y = self._emit(item, metadata=metadata)
if type(y) is list:
L.extend(y)
else:
L.append(y)
return L


Expand Down
10 changes: 6 additions & 4 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,13 +800,15 @@ def test_frequencies():
assert L[-1] == {'a': 2, 'b': 1}


def test_flatten():
@pytest.mark.parametrize("iterators",
[[[1, 2, 3], [4, 5], [6, 7, 8]],
[(i for i in range(1, 7)), (i for i in range(7, 9))]])
def test_flatten(iterators):
source = Stream()
L = source.flatten().sink_to_list()

source.emit([1, 2, 3])
source.emit([4, 5])
source.emit([6, 7, 8])
for iterator in iterators:
source.emit(iterator)

assert L == [1, 2, 3, 4, 5, 6, 7, 8]

Expand Down

0 comments on commit b4f0450

Please sign in to comment.