Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable should implement Symbol.asyncIterator #111

Open
benlesh opened this issue Sep 17, 2016 · 12 comments
Open

Observable should implement Symbol.asyncIterator #111

benlesh opened this issue Sep 17, 2016 · 12 comments

Comments

@benlesh
Copy link

benlesh commented Sep 17, 2016

For interop with the other proposed type it would be great if we implemented Symbol.asyncIterator.

Risks

  • implementing this would mean introducing an unbounded buffer, and consumers of the returned asyncIterator would have to consume this responsibly.
  • Possible confusion on the nature of observables when they can be consumed with for async blocks
  • Would not be the most efficient way to consume an Observable, given it would allocate a Promise and an IteratorResult for each value.

Upsides

  • Consuming observables with for async blocks would be pretty rad, IMO.
  • Async interop... <3. Promises, observables and asyncIterables all getting along.
  • Might help people leverage observable APIs in a more imperative way should they become more common.. i.e. if DOM elements starting giving events in observables.

Related: tc39/proposal-async-iteration#47

I suspect this is out of scope for similar reasons to that issue... but I want to put it on the table for discussion. It could land in RxJS ~6 or something ;)

EDIT: added bit about DOM events and imperative consumption

@benlesh
Copy link
Author

benlesh commented Sep 17, 2016

cc @domenic

@benlesh
Copy link
Author

benlesh commented Sep 17, 2016

implementing this would mean introducing an unbounded buffer

To clarify on this for other readers... Since Observables push values as fast as possible, but consumers of AsyncIterators are required to next out each async value... All arriving values would have to land in a queue unless they were already requested.

@benjamingr
Copy link

I think it's entirely fine we have an unbounded buffer here - there is no choice anyway and we need this interop :)

Good idea overall.

@benlesh
Copy link
Author

benlesh commented Sep 17, 2016

Imagining a world where DOM events were provided as Observables, I think making them consumable as asyncIterables as well might have value to some developers.

For example, button clicks making some ajax request?

for await (const e of myButton.clicks) {
  const data = await fetch(someUrl);
  doAThing(data);
}

However mouse movements would be a little "allocatey"... So this would be yuck...

for await (const e of document.mouseMoves) {
    yield { x: e.clientX, y: e.clientY };
}

I'm not sure what you'd do when you had an await inside of the for await loop too, ideally, I guess you'd drop the mouseMoves you don't care about?? This is severly edge casey, and perhaps off-topic, I'm just thinking about how people will use it.

@benlesh
Copy link
Author

benlesh commented Sep 17, 2016

I'm happy to dogfood this a little in RxJS too... at least in a branch

@zenparsing
Copy link
Member

The challenge lies in deciding what the default strategy should be. Sometimes you might want to buffer and sometimes you might want to drop.

@benlesh
Copy link
Author

benlesh commented Sep 17, 2016

The challenge lies in deciding what the default strategy should be. Sometimes you might want to buffer and sometimes you might want to drop.

Exactly. Perhaps the future spec around this could decide on the default behavior.. There's a short but confusing list of strategies here.

  • Buffer every value and emit them one at a time ... a...b...c...d
  • Buffer every value and emit them in chunks... [[a,b]... [c,d]]
  • Drop values that arrive while not being consumed. 'a.......c......`
  • Just give the most recently arrived value immediately (arguably better for a plain iterable, I guess)

@Jamesernator
Copy link

Jamesernator commented Oct 18, 2016

Really difficult to pick a default, the problem is even the same stream might want to be consumed in different ways.

E.G. with a hypothetical example

async function drawLine(start, mouseMoves) {
    // Has to be buffered as we shouldn't be dropping line segments
    // so even if they're subtlely late it doesn't matter
    let previous = start
    for await (const move of mouseMoves) {
        drawLine(previous, move)
        previous = move
    }
}

async function drawFancyAnimatedMouse(mouseMoves) {
    // Should be unbuffered, we only really care about the most recent
    // mouseMove since the previous animation finished as we don't really
    // want lots of trailing animations
    for await (const move of mouseMoves) {
        animateMouse(move.location)
    }
}

async function main() {
    for await (const mousedown of mouseDowns) {
        const mouseMoves = getMouseMoves.takeUntil(mouseUp)
        await Promise.all([
            drawLine(start, mouseMoves),
            drawFancyAnimatedMouse(mouseMoves)
        ])
    }
}

Although I feel the most useful for a default would be buffered, because people might use Observables as ReadableStreams it seems necessary that they would be able to consume them without loss (e.g. reading a socket for a blob of data).

Instead people could just add their own operators to create their own iterators e.g. for fixing the example:

async function drawLine(start, mouseMoves) {
    // Has to be buffered as we shouldn't be dropping line segments
    // so even if they're subtlely late it doesn't matter
    let previous = start
    for await (const move of mouseMoves) {
        drawLine(previous, move)
        previous = move
    }
}

/* @this */
async function* latest() {
    let done = false
    let latestValue
    let resolve
    let change = new Promise((_resolve) => {
        resolve = _resolve
    })
    this.subscribe({
        next(val) {
            // Set the latest value to whatever was last seen
            latestValue = val
            resolve()
            // This prevents yielding duplicates
            change = new Promise((_resolve) => {
                resolve = _resolve
            })
        },
        complete(val) {
            latestValue = val
            done = true
        }
    })

    while (true) {
        await change // Don't proceed until the value has actually changed
        if (done) {
            break
        }
        yield latestValue
    }
    return latestValue
}

async function drawFancyAnimatedMouse(mouseMoves) {
    // Should be unbuffered, we only really care about this every animation
    // frame so wait for each animation frame before rerendering
    for await (const move of mouseMoves::latest()) {
        animateMouse(move.location) // Might be expensive so just wait
        await animationFrame() // Might be more mouseMoves between
                               // animation frames
                               // either way extra mouseMoves get dropped
    }
}

async function main() {
    for await (const mousedown of mouseDowns) {
        const mouseMoves = getMouseMoves(...).takeUntil(mouseUp)
        await Promise.all([
            drawLine(start, mouseMoves),
            drawFancyAnimatedMouse(mouseMoves)
        ])
    }
}

@ckknight
Copy link

Perhaps this could be better handled by having something like a .toAsyncIterable() method which optionally takes ones of the strategies as @Blesh mentioned.

@fabiancook
Copy link

fabiancook commented Nov 23, 2019

I have been working on this a bunch, and I believe that this is completely do-able.

Here is an example of a complete observable like implementation that is by default using async iterables: https://github.com/opennetwork/iterable/blob/next/src/core/transient-source.ts

It uses a weak map "linked list" as a queue for each iterator that is created, the iterator grabs a pointer that it holds to know where in the list it should start reading from, which can be seen here https://github.com/opennetwork/iterable/blob/next/src/core/transient-source.ts#L185

When the consumer of the iterator has read the entire list then the iterator goes into listening mode and queues itself to listen for a response by creating a deferred promise, which can be seen here https://github.com/opennetwork/iterable/blob/next/src/core/transient-source.ts#L204

It then loops this process until the source has been marked as complete, at which point it will forget its pointer which also ensures all future invokes will result in done being true.

A consumer of an iterator can mark itself as done by invoking return, this is what is done when break is used with a for await loop.

Because the queue is a weakly linked list, if there is no reference to the iterator or pointer, then the queue is garbage collected and doesn't grow forever. If someone is holding onto a reference to an iterator... they're still listening, the growing queue is on them.

So the solution is a little bit of both async styles IMO, push & pull together.

Because each iterator instance keeps track of its own pointer within its scope, depending on how the values are being consumed, is how the values will be retrieved.

If there are never any consuming iterators, source only ever stores a value until the next value arrived (because the old pointer is no longer referenced)

This is an example of a single consumer needed for an observable to become iterable, source fans out the values and does all of the above, the original observable is used as normal. https://gist.github.com/fabiancook/dbb86ddbef437c6f49910438ec34d883

TLDR points:

  • A weak linked list is used to hold available values, the weak association is done through pointer objects
  • Each iterator holds its placement pointer object within its own scope, when an iterator reference is lost, pointer reference is lost
  • If no iterators, as new values come in, old values are forgotten
  • If no iterators are needing new values, then no pull sources are invoked
  • Iterators that start with the same pointer will always get the same values in the same order, guaranteed.
  • Works with for await using observe

There is a little bit of fluff in my implementation because I want to support things like using a source that we can pull from instead of only having a producer pushing to an observable. If we have a source and a producer pushing at the same time, the pushed values take precedence as they can be seen immediately as the next values in the list.


Happy to answer any questions on this

@runarberg
Copy link

I did a toy implementation of Symbol.asyncIterator using the proposed simplified API. It follows the same strategy as forEach.

benlesh/tc39-observable-proposal#1

In my opinion, it is the sensible default. If you wan’t something different you can always map the source, e.g:

for await (const chunk of chunks(source)) {
  // ...
}

@loreanvictor
Copy link

Sorry for the newbie question I am not well-versed in async iterator protocol, how would the break keyword work here though? I mean with an async generator you simply stop pulling it, but with an Observable at the core you wouldn't you need to cleanup your subscription to it as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants