Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect does not allow awaitable sinks #468

Open
arunaruljothi opened this issue Apr 4, 2023 · 0 comments
Open

Collect does not allow awaitable sinks #468

arunaruljothi opened this issue Apr 4, 2023 · 0 comments

Comments

@arunaruljothi
Copy link

arunaruljothi commented Apr 4, 2023

The collect class does not allow for awaitable sinks.

Small example:

async def sink_to_something(x):
    print(x)
    return await asyncio.sleep(1)

source = streamz.Source()
collector = source.collect()
collector.sink(sink_to_something)

for i in range(10):
    source.emit(i)
    collector.flush()

Changing def flush in the collect class from:

@Stream.register_api()
class collect(Stream):
    ...
    def flush(self, _=None):
        out = tuple(self.cache)
        metadata = list(self.metadata_cache)
        self._emit(out, metadata)
        ...

To:

@Stream.register_api()
class collect(Stream):
    ...
    def flush(self, _=None):
        out = tuple(self.cache)
        metadata = list(self.metadata_cache)
        # change self._emit to self.emit (self.emit waits for awaitable results from downstream)
        self.emit(out, metadata=metadata)
        ...

Fixed this problem, but I'm not sure if this has any drawbacks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant