Here's an example:
from time import sleep
from streamz import Stream
class Counter:
def __init__(self):
self.n = 0
def __call__(self, x):
self.n += 1
return x
def __str__(self):
return f"Counter({self.n})"
cin = Counter()
cout = Counter()
src = Stream.from_textfile("1000_lines.txt")
out = Stream()
out.map(cin).rate_limit(1).sink(cout)
src.connect(out)
src.start()
sleep(1)
print(cin, cout)
assert cin.n < 1000 # AssertionError
I expected this to work the same way as src.map(...).rate_limit(...).sink(...), but yield _emit() in the source doesn't block. If you go into rate_limit.update, you'll see that it gets called multiple times, so data actually accumulates in the form of running blocked coroutines.
You can do out = Stream().map(cin) and then in will work fine, but if you want to additionally emit something from out before the first map (so that it is counted by cin), this isn't a solution.
Here's an example:
I expected this to work the same way as
src.map(...).rate_limit(...).sink(...), butyield _emit()in the source doesn't block. If you go intorate_limit.update, you'll see that it gets called multiple times, so data actually accumulates in the form of running blocked coroutines.You can do
out = Stream().map(cin)and then in will work fine, but if you want to additionally emit something fromoutbefore the first map (so that it is counted bycin), this isn't a solution.