Skip to content

n1ru4l/push-pull-async-iterable-iterator

main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
src
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

@n1ru4l/push-pull-async-iterable-iterator

TypeScript npm version npm downloads Dependents Build Status

Create an AsyncIterableIterator from anything (on any modern platform) while handling back-pressure!

yarn install -E @n1ru4l/push-pull-async-iterable-iterator

Standalone Usage

import { makePushPullAsyncIterableIterator } from "@n1ru4l/push-pull-async-iterable-iterator";

const {
  pushValue,
  asyncIterableIterator
} = makePushPullAsyncIterableIterator();
pushValue(1);
pushValue(2);
pushValue(3);

// prints 1, 2, 3
for await (const value of asyncIterableIterator) {
  console.log(value);
}

Check if something is an AsyncIterable

import { isAsyncIterable } from "@n1ru4l/push-pull-async-iterable-iterator";

if (isAsyncIterable(something)) {
  for await (const value of something) {
    console.log(value);
  }
}

Note: On Safari iOS Symbol.asyncIterator is not available, therefore all async iterators used must be build using AsyncGenerators. If a AsyncIterable that is NO AsyncGenerator is passed to isAsyncIterable on the Safari iOS environment, it will return the value false.

Wrap a Sink

import { makeAsyncIterableIteratorFromSink } from "@n1ru4l/push-pull-async-iterable-iterator";
// let's use some GraphQL client :)
import { createClient } from "graphql-ws/lib/use/ws";

const client = createClient({
  url: "ws://localhost:3000/graphql"
});

const asyncIterableIterator = makeAsyncIterableIteratorFromSink(sink => {
  const dispose = client.subscribe(
    {
      query: "{ hello }"
    },
    {
      next: sink.next,
      error: sink.error,
      complete: sink.complete
    }
  );
  return () => dispose();
});

for await (const value of asyncIterableIterator) {
  console.log(value);
}

Apply an AsyncIterableIterator to a sink

import Observable from "zen-observable";
import {
  makePushPullAsyncIterableIterator,
  applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";

const { asyncIterableIterator } = makePushPullAsyncIterableIterator();

const observable = new Observable(sink => {
  const dispose = applyAsyncIterableIteratorToSink(asyncIterableIterator, sink);
  // dispose will be called when the observable subscription got destroyed
  // the dispose call will ensure that the async iterator is completed.
  return () => dispose();
});

const subscription = observable.subscribe({
  next: console.log,
  complete: () => console.log("done."),
  error: () => console.log("error.")
});

const interval = setInterval(() => {
  iterator.push("hi");
}, 1000);

setTimeout(() => {
  subscription.unsubscribe();
  clearInterval(interval);
}, 5000);

Put it all together

import { Observable, RequestParameters, Variables } from "relay-runtime";
import { createClient } from "graphql-ws/lib/use/ws";
import {
  makeAsyncIterableFromSink,
  applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";
import { createApplyLiveQueryPatch } from "@n1ru4l/graphql-live-query-patch";

const client = createClient({
  url: "ws://localhost:3000/graphql"
});

export const execute = (request: RequestParameters, variables: Variables) => {
  if (!request.text) {
    throw new Error("Missing document.");
  }
  const query = request.text;

  return Observable.create<GraphQLResponse>(sink => {
    // Create our asyncIterator from a Sink
    const executionResultIterator = makeAsyncIterableFromSink(wsSink => {
      const dispose = client.subscribe({ query }, wsSink);
      return () => dispose();
    });

    const applyLiveQueryPatch = createApplyLiveQueryPatch();

    // apply some middleware to our asyncIterator
    const compositeIterator = applyLiveQueryPatch(executionResultIterator);

    // Apply our async iterable to the relay sink
    // unfortunately relay cannot consume an async iterable right now.
    const dispose = applyAsyncIterableIteratorToSink(compositeIterator, sink);
    // dispose will be called by relay when the observable is disposed
    // the dispose call will ensure that the async iterator is completed.
    return () => dispose();
  });
};

Operators

This package also ships a few utilities that make your life easier!

map

Map a source

import { map } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const square = map((value: number): number => value * value);

for await (const value of square(source())) {
  console.log(value);
}
// logs 1, 4, 9

filter

Filter a source

import { filter } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const biggerThan1 = filter((value: number): number => value > 1);

for await (const value of biggerThan1(source())) {
  console.log(value);
}
// logs 2, 3

Other helpers

withHandlers

Attach a return and throw handler to a source.

import { withReturn } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const sourceInstance = source();

const newSourceWithHandlers = withHandlers(
  sourceInstance,
  () => sourceInstance.return(),
  err => sourceInstance.throw(err)
);

for await (const value of stream) {
  // ...
}

About

Create an AsyncIterableIterator from anything while handling back-pressure!

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •