Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Iterators for changefeeds #370

Open
mxstbr opened this issue Mar 10, 2018 · 7 comments
Open

Async Iterators for changefeeds #370

mxstbr opened this issue Mar 10, 2018 · 7 comments

Comments

@mxstbr
Copy link

mxstbr commented Mar 10, 2018

Async Iterators are a new JavaScript feature introduced in ES2018. They allow one to asynchronously iterate over data—which is the perfect use case for changefeeds! They'll be rapidly gaining popularity now that they're a standard feature and will soon be one of the main ways to do async data in JavaScript.

Async Iterators have 4 keys, one of which is the asyncIterator symbol:

// Note: iterall is a neat tiny package that helps write backwards-compatible async iterators, see https://github.com/leebyron/iterall for more information. We could also inline this symbol + polyfill instead which 1 line of code
import { $$asyncIterator } from 'iterall';

const delay = ms => new Promise(res => setTimeout(res, ms));

let n = 0;
// This will async count from 1 to 10 every 100ms
const counterAsyncIter = {
  next: () => {
    if (n <= 10) {
      return delay(100).then(() => ({ value: n++, done: false }))
    } else {
      return this.return()
    }
  },
  return: () => Promise.resolve({ value: undefined, done: true }),
  throw: (err) => Promise.reject(err),
  [$$asyncIterator]() {
    return this;
  },
};

for await (const number of counterAsyncIter) {
  console.log(number);
}
console.log('Done counting!')

I built the callback-to-async-iterator package because I needed to transform changefeeds into async iterators, which I do like this right now:

import asyncify from 'callback-to-async-iterator';

const listenToNewMessages = (callback) => {
  return db.table('messages')
    .changes({ includeInitial: false })
    .run()
    .then(cursor => {
      cursor.eachAsync(value => {
        callback(value);
      })
    })
}

asyncify(listenToNewMessages);

While that works, it requires buffering on both ends and is a bit finnicky. Instead, I propose that the Cursor object in rethinkdbdash exposes the above keys and can be used as an async iterator automatically! We could then use changefeeds like this:

const messages = await db.table('messages')
  .changes({ includeInitial: false })
  .run()
  .then(cursor => {
    cursor.eachAsync(value => {
      callback(value);
    })
  })

for await (const message of messages) {
  console.log(message);
}

Look how neat that is! 😍

The Cursor already specifies a .next() and .close() method, which is already 90% of the work! The only real discussion point is changing cursor.next() to return an object with the shape of { value: whatever, done: false } rather than the value directly, which would probably require a major version release of this package.

All the other work is writing a return method which aliases .close and returns an object with the shape { value: undefined, done: true }, add the two other methods and that's it!

I'd be happy to submit a PR implementing this!

@mxstbr
Copy link
Author

mxstbr commented Mar 10, 2018

In case you don't want to break existing users the Cursor could also expose the async iterator under a key, something like this:

const messages = await db.table('messages')
  .changes({ includeInitial: false })
  .run()
  .then(cursor => {
    cursor.eachAsync(value => {
      callback(value);
    })
  })

console.log(await messages.next()) // => "{ id: 'asdf-123' }", resolves with the value directly so no breaking change for existing users!

// The async iterator is exposed at messages.iterator instead, so no breaking change for existing users!
console.log(await messages.iterator.next()) // => "{ value: { id: 'asdf-124' }, done: false }", resolves with the required shape for async iterators
for await (const message of messages.iterator) {
  console.log(message);
}

@sagivf
Copy link
Contributor

sagivf commented Mar 11, 2018

Maybe i'm missing something, but doesn't this suffice:

async function newMessage (err, message) {
   console.log(message)
}

const cursor = await db.table('messages')
  .changes({ includeInitial: false })
  .run()

cursor.each(newMessage)

@mxstbr
Copy link
Author

mxstbr commented Mar 11, 2018

Of course, you can get the same thing done today too. This doesn't add any new behavior—the problem is other libraries expecting async iterators as input. For us, that's GraphQL subscriptions which require passing async iterators:

export const resolvers = {
  Subscription: {
    messageAdded: {
      subscribe: () => {/* need to return an async iterator! */},
    },
  },
}

That's why I built the callback-to-async-iterator module, to be able to use RethinkDB changefeeds with GraphQL subscriptions. Then I looked under the hood and realized that 95% of the work was already done to make async iterators a first-class citizen, so I figured it'd be worth suggesting!

@sagivf
Copy link
Contributor

sagivf commented Mar 11, 2018

Great point! This could be very interesting to work on.
I'm not familiar with this projects code and currently not free, but maybe soon.

@aleclarson
Copy link
Contributor

@mxstbr Your best bet is to submit a PR and wait. But judging from the number of unresolved issues and open pull requests, this project is not being actively maintained.

@mxstbr
Copy link
Author

mxstbr commented Mar 11, 2018

Yeah I'll probably just whip up a quick PR, I just wanted to give @neumino a chance to tell me not to before I go do all that work. 😊

this project is not being actively maintained

As far as I know @neumino is "actively" maintaining this, I just think there isn't a ton to change anymore given the pace of change in the main driver has slowed down too. He's very responsive on Twitter to any questions.

mxstbr added a commit to mxstbr/rethinkdbdash that referenced this issue Mar 12, 2018
@mxstbr
Copy link
Author

mxstbr commented Mar 12, 2018

See PR #371. I made it a method and called it cursor.asyncIterator(), comments welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants