Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ziplast operator #14

Closed
andersea opened this issue Apr 4, 2018 · 5 comments
Closed

Ziplast operator #14

andersea opened this issue Apr 4, 2018 · 5 comments

Comments

@andersea
Copy link

andersea commented Apr 4, 2018

Just thought I'd share some ideas for streams that would be useful to me.

A ziplast stream. It would zip a number of async sequences, but yield a tuple for every item recieved on every stream with each element being the last element recieved on each stream. If no elements have yet been recieved on a stream, the value could be None on the corresponding tuple position. I am really often running into having to coordinate two sequences that yield values at different intervals, one every other second and another every ten seconds, for example.

Merge can kind of achieve this, if you are able tell each element apart, for example by tagging them somehow. If each item has different type, then it is doable, but if each item is a dict for example, then it becomes not so easy to tell where each item came from, unless you wrap it with some metadata.

Also a ratelimit stream. It yields a value at most every n seconds. The rest of the items are discarded.

@vxgmichel
Copy link
Owner

Just thought I'd share some ideas for streams that would be useful to me.

Great, thanks!

A ziplast stream

That would be the equivalent of ReactiveX combineLatest right?

Here's a possible implementation:

@operator(pipable=True)
def ziplatest(*sources):
    n = len(sources)

    # Add source index to the items
    new_sources = [
        stream.map(source, lambda x, i=i: {i: x})
        for i, source in enumerate(sources)]

    # Merge the sources
    merged = stream.merge.raw(*new_sources)

    # Accumulate the current state in a dict
    accumulated = stream.accumulate.raw(
        merged, lambda x, e: {**x, **e})

    # Convert the state dict to a tuple
    return stream.map.raw(
        accumulated, lambda x: tuple(map(x.get, range(n))))

And an example:

async def main():
    xs = stream.count(interval=2) | pipe.delay(2)
    ys = stream.count(interval=5) | pipe.delay(5)
    zs = xs | ziplatest.pipe(ys)
    ps = zs | pipe.print()
    await ps

With the corresponding output:

(0, None)
(1, None)
(1, 0)
(2, 0)
(3, 0)
(3, 1)
(4, 1)
(5, 1)
(6, 1)
(6, 2)
(7, 2)
[...]

I guess extra keywords arguments could also be provided, either to disable the producing of incomplete items, or to define a custom default value.

What do you think?

@andersea
Copy link
Author

andersea commented Apr 5, 2018

Awesome! That is exactly what I need.

@vxgmichel
Copy link
Owner

Great! I don't really have the time to write the docs and tests for this feature at the moment, so feel free to submit a PR if you're interested. Otherwise, I'll take care of it later.

@andersea
Copy link
Author

andersea commented Apr 6, 2018

Np, thanks for the effort. I really like this library and async generator functions is a pretty nice new python feature. It really cleans up a lot of my previous async context manager code in the project I am currently working at. You get a feeling that async/await is still a pretty young feature in python and there are definitely a few pitfalls to dodge here and there.

@vxgmichel vxgmichel changed the title Ideas for streams. Ziplast and ratelimit Ziplast operator Aug 8, 2018
@vxgmichel
Copy link
Owner

Hi @andersea,

I just added the ziplatest operator (see PR #22), it will become available with version 0.3.1.

Thanks for the suggestion :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants