Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async iteration #1

Open
mstade opened this issue Mar 31, 2017 · 7 comments
Open

Async iteration #1

mstade opened this issue Mar 31, 2017 · 7 comments

Comments

@mstade
Copy link

mstade commented Mar 31, 2017

For what it's worth, async iteration is coming and I've basically resigned to the idea that this is now how streams in JavaScript will look. Oh well.

@pemrouz
Copy link
Contributor

pemrouz commented Apr 1, 2017

I must admit, after writing so much functional programming async/await has definitely made writing synchronous-like code quite enjoyable and extremely simple for some cases. The following is part of a test that spins up a distributed service, waits till it stablises, spins up clients, waits till they are all connected, and then runs the stress test, waits for results, and kills all the nodes before moving onto the next configuration.

async function bench() {
  for (msize of messages)
    for (csize of clusters)
      for (lsize of clients) {
        const details = { msize, csize, lsize, records, connections }
            , cluster = await createCluster(details)
            , clients = await createClients(details)
            , results = await test(cluster, clients)

        save(details, cluster, clients, stamps)
        values(clients).map(d => d.kill())
        values(cluster).map(d => d.kill())
      }

  log(str(stamps))
}

for await will probably further increase the number of cases for which sync-like code might be better. However, I think there is still utility in being able to map/filter/reduce over events, just like typical FP often makes code more readable than using if, for, etc. Particularly when dealing with multiple streams, or callbacks. Currently it's a bit of pain when trying to resolve a promise but need to wait on a callback. Hence a key motivation was to provide a unifying abstraction between promises, observables and callbacks.

The initial code I wrote for await stable(cluster) looked something like the below:

const stable = peers => new Promise(resolve => {
  const start = process.hrtime()
  resolve = debounce(1000)(resolve)
  peers.map(peer => peer.on('message', ({ checksum }) => {
    if (checksum != checksums[peers.length]) return log("cluster unstable".red, checksums[peers.length], '/', checksum)
    peer.stable = process.hrtime(start)
    log("cluster peer stable".yellow, checksum)
    if (peers.every(by('stable'))) { 
      log('cluster stable'.green, '(', peers.length, ')')
      resolve(peers)
      peers.map(peer => peer.removeAllListeners())
    }
  }))
})

It waits till all peers have the same checksum (hash of peers). This is much more readable in the below form imho. Also notice the return value which is a stream we manipulate, but we are only interested in the first and only time it emits (i.e. cluster converged) and hence it can be awaited, unlike ES6 Observables.

async function stable(peers) {
  const start = process.hrtime()
      , combined = emitterify()

  // combine checksum changes from each peer into one stream
  peers
    .map(peer => peer.on('checksum', ({ checksum }) => combined.emit('change', { peer, checksum })))
    
  // log if peer yet to still discover all other peers
  changes
    .filter(({ checksum }) => checksum != checksums[peers.length])
    .map(({ checksum }) => log('cluster unstable'.red, checksums[peers.length], '/', checksum))

  // log and record time it took peer to discover all other peers
  changes
    .filter(({ checksum }) => checksum === checksums[peers.length])
    .filter(({ peer, checksum }) => (peer.stable = process.hrtime(start)))
    .map(({ checksum }) => log('cluster peer stable'.yellow, checksum))

  // resolve when all peers discovered all other peers
  return changes
    .filter(d => peers.every(by('stable')))
    .map(d => log('cluster stable'.green, '(', peers.length, ')'))
    .map(d => peers)
} 

This library should also play nice with for await by implementing [Symbol.asyncIterator]():

for await (const { peer, checksum } of cluster.on('change'))

Do you think there is no need for this library or similar once for await becomes commonplace?

@mstade
Copy link
Author

mstade commented Apr 1, 2017

My point wasn't so much that your library is useless, but that it's probably useless to try and come up with a unified abstraction for async or sync streams because that problem is solved by AsyncIterator and Iterator protocols respectively.

There is probably value in having functions that operate on iterables however, like so:

let numbers = numbersFromSomewhere()
let squared = await map(numbers, n => n * n)
let even = await filter(squared, n => n % 2 === 0)

Then you can compose, partially apply, thread, or any of the other nice functional thingies you may want to do, which you can't really do with a dot-style fluid API.

@mstade
Copy link
Author

mstade commented Apr 1, 2017

But I must digress

As a side note, I don't much care for the for await syntax, or the async/await syntax at all. If the point is to make async code look sync, just assume everything is sync and let me explicitly decide when I want things do be async instead.

This is what we have now:

async function downloadThings() {
  let first = await download(firstThing)
  let second = await download(secondThing)
  let third = download(thirdThing)

  return { first, second, third }
}

/* We also don't even have top-level await (at least not yet) so
 * in order to use the above I have to do this nonsense:
 */
downloadThings().then(async ({ first, second, third }) => {
  console.log(first, second, await third)
})

But really, it should just be:

function downloadThings() {
  let first = download(firstThing) // Implicitly awaited
  let second = download(secondThing) // Implicitly awaited
  let third = async download(thirdThing) // Returns promise

  return { first, second, third }
}

let { first, second, await third } = downloadThings() // Implicitly awaited
console.log(first, second, third)

Sure, it'd probably be difficult for engines to optimize, but at least they are compilers – I'm not.


Not to mention the inconsistency in having for async instead of async for. Try writing function async() {} and see how well that works out.

@pemrouz
Copy link
Contributor

pemrouz commented Apr 1, 2017

Agreed, I think there's room for improvement in some of the newer API's too. In contrast to Observables, I see AsyncIterator more like an interface that this library could (and hopefully will!) implement.

In terms of the example below, the main problem I see with this API is that the second line will wait till completion until any of the values in third line are processed:

let numbers = numbersFromSomewhere()
let squared = await map(numbers, n => n * n)
let even = await filter(squared, n => n % 2 === 0)

Whereas ideally you want a way to manage concurrency across many streams. A small core with just map/filter/reduce imho allows you to setup declarative pipeline and aims to strike a balance between e.g. rxjs that has lots of built-in operators and no operators.

@mstade
Copy link
Author

mstade commented Apr 2, 2017

In terms of the example below, the main problem I see with this API is that the second line will wait till completion until any of the values in third line are processed:

I suppose you meant the other way around. Anyway, this is the reason why the Clojure folk came up with transducers: decoupling the transformation from the collection.

@pemrouz
Copy link
Contributor

pemrouz commented Apr 4, 2017

Yeah, sorry wasn't very clear: the third line will wait till the stream on the second line completes (which might be infinite).

Re: transducers: you could compose an algorithmic transformation and pass it to .reduce.

@pemrouz
Copy link
Contributor

pemrouz commented Nov 12, 2017

This is now done by implementing Symbol.asyncIterator, but more importantly conceptually helped clear up a few things. With CSP, the data production, medium, and consumption is decoupled. The medium or channel, is essentially the main primitive here (with FRP (Observables), the producer and medium is not so decoupled by default which is one thing I disliked) and they generate values without being co-operative (e.g. by propagating backpressure).

So now you can pull from a channel like so:

// blocks until new value is put on the channel
threads.consumer = async chan => {
  for await (const value of chan)
    if (results.push(value) == 10) break
}

and a producer can respond to pull signals, which is another event stream on the channel itself:

// puts new value on the channel as consumer pulls
threads.producer = async (chan, i = 0) => {
  for await (const d of chan.on('pull'))
    chan.next(++i)
}

which means producers/consumers can now co-operate:

const chan = o.on('foo')
threads.producer(chan)
threads.consumer(chan)
expect(results).to.be.eql([1,2,3,4,5,6,7,8,9,10])

@mstade - I'd be interested to hear your thoughts on this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants