Skip to content

Promise-based Streams with pressure control, Error delivery and lots of RxJS-like operators.

License

Notifications You must be signed in to change notification settings

psxcode/promised-streams

Repository files navigation

Promised Streams

Promise-based Streams with pressure control, Error delivery and lots of RxJS-like operators.

codecov Build Status

Install

npm install promised-streams

Pull streams

import { pullFromIterable, pullMap, pullFilter, pullTake } from 'promised-streams'

const data = [0, 1, 2, 3, 4]

const pipedTransforms = pipe(
  pullFilter(x => x % 2 === 0),
  pullMap(x => x * 2),
  pullTake(-1)
)

const pullProducer = pipedTransforms(
  pullFromIterable(data)
)

/* consume PullProducer */
while (true) {
  /* unwrap the value */
  const { value, done } = await pullProducer()

  /* check if done */
  if (done) {
    break
  }

  /* consume the value */
  console.log(value)
}

Push streams

import { pushFromIterable, pushMap, pushFilter, pushTake } from 'promised-streams'

const data = [0, 1, 2, 3, 4]

const composedProducer = compose(
  pushFromIterable(data),
  pushFilter(x => x % 2 === 0),
  pushMap(x => x * 2),
  pushTake(-1)
)

/* subscribe to PushProducer */
await composedProducer(async (result) => {
  /* unwrap the value */
  const { value, done } = await result

  /* check if done */
  if (done) {
    return
  }

  /* consume the value */
  console.log(value)
})

Terminology

Push type streams, where values are eagerly pushed by producer to consumer, as soon as available.
Pull type stream, where values are lazily pulled by consumer from producer, as soon as needed.
Pressure is a special data channel, carrying information about data saturation in the stream.

PushProducer is an active, push type producer, which pushes values to consumer, as soon as they are available.
PushConsumer is a passive, push type consumer, which waits for values to arrive from producer.
Pressure information in such streams is delivered from consumer to producer, in term of high pressure, or data consumption is in progress.

PullConsumer is an active, pull type consumer, which pulls values from producer as needed.
PullProducer is a passive, pull type producer, which provides values to be pulled from.
Pressure information is delivered from producer to consumer, in term of low pressure, or data production is in progress.

Pool.
Pump.

Iterator Protocol

Standard Javascript iterator protocol carries data and end of the iteration indicator

const iterator = getIterator(data)

const chunk = iterator.next() // { value: 42, done: false }

{ value: 42, done: false } is a valid data chunk

const chunk = iterator.next() // { value: undefined, done: true }

{ value: undefined, done: true } is an iteration end chunk

This protocol implements lazy Pull type stream of values, with several limitations

  • No Error information channel
  • Synchronous delivery, so data must be available at the moment of request

Async Iterator Protocol

Adding Javascript Promises to the Iterator Protocol allows to carry additional information

const iterator = getIterator(data)

const chunkPromise = iterator.next() // Promise<{ value, done }>

const chunk = await chunkPromise

Features of Asynchronous Iterator Protocol

  • Lazy data requests, by calling next when needed.
  • Async data delivery in chunks, by await chunkPromise.
  • Async Error delivery by rejected Promises.
  • End of stream indication by { value: undefined, done: true }.

Publish / Subscribe Protocol

This protocol implements eager Push type streams of values

producer.subscribe((chunk) => {
  consumeChunk(chunk)
})
  • No Error delivery
  • No end of stream indication
  • No pressure control, data must be consumed synchronously

With adoption of Node's error first callback style, we can add Error delivery to this protocol

producer.subscribe((error, chunk) => {
  if (error) {
    return reportError(error)
  }
  consumeChunk(chunk)
})

But still no end of stream and pressure respect.

By adding Iterator Protocol chunk objects, we can add end of stream indication.

producer.subscribe((error, { value, done }) => {
  if (error) {
    return reportError(error)
  }
  if (done) {
    return reportDone()
  }
  consumeChunk(value)
})

But still no pressure control.

By introducing Async Iterator Protocol chunk objects, we can encapsulate Error in promise itself.

producer.subscribe(async (chunkPromise) => {
  let chunk

  try {
    chunk = await chunkPromise
  } catch (e) {
    return reportError(e)
  }

  if (chunk.done) {
    return reportDone()
  }

  consumeChunk(chunk.value)
})

Now its easy to add support for pressure control.
Producer must first resolve Promise returned by Subscriber function, and only after that send next value. We could also add await operator for consumeChunk function call...

await consumeChunk(chunk.value)

Thus making producer to wait for chunk to be actually processed, before sending next one.

Interfaces

type PushConsumer <T> = (value: Promise<IteratorResult<T>>) => Promise<void>
PushConsumer is just a function which accepts a value and returns a Promise to have time to consume the chunk. This promise can be rejected to indicate error during data processing, or unsubscribe. Producer will immediately stop data delivery to unsubscribed consumer.

type PushProducer <T> = (consumer: PushConsumer<T>) => Promise<void>
PushProducer is a function accepting PushConsumer. After getting a consumer, producer begins sending data to consumer. PushProducer returns a Promise, which resolves after all data was pushed to consumer. This promise will never be rejected. All error during data production will be forwarded to consumer.

type PullProducer <T> = () => Promise<IteratorResult<T>>
PullProducer is a function which returns a chunk of data, delivered as Promise to IteratorResult. Promise can be rejected by producer to indicate an Error.

type PullConsumer <T> = (producer: PullProducer<T>) => Promise<void>
PullConsumer is a function accepting PullProducer. After getting the producer, consumer begins pulling the data. PullConsumer returns a Promise, which will be resolved after add data was pulled, or will be rejected if producer delivered an Error.

Creation

pullFromIterable

Creates Pull type producer, which will stream data from standard Iterable.

<T> (iterable: Iterable<T>) => PullProducer<T>

import { pullFromIterable } from 'promised-streams'

const data = [0, 1, 2, 3]
/* create PullProducer */
const producer = pullFromIterable(data)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await producer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushFromIterable

Creates Push type producer, which will stream data from standard Iterable.

<T> (iterable: Iterable<T>) => PushProducer<T>

import { pushFromIterable } from 'promised-streams'

const data = [0, 1, 2, 3]
const pushProducer = pushFromIterable(data)

/* subscribe to PushProducer */
await pushProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullFromStream

Creates Pull type producer, which will deliver data from NodeJS stream.

<T> (stream: NodeJS.ReadableStream) => PullProducer<T>

import { pullFromStream } from 'promised-streams'

const readable = createStream()
const producer = pullFromStream(readable)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await producer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushFromStream

Creates Push type producer, which will deliver data from NodeJS stream.

<T> (stream: NodeJS.ReadableStream) => PushProducer<T>

import { pushFromStream } from 'promised-streams'

const readable = createStream()
const pushProducer = pushFromStream(readable)

/* subscribe to PushProducer */
await pushProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

Conversion

pool

Converts Push type producer to Pull type producer.

<T> (options: IPoolOptions) => IPool<T>

type IPool <T> = { push: PushConsumer<T>, pull: PullProducer<T> }

import { pool, pushFromIterable } from 'promised-streams'

const data = [0, 1, 2, 3]
const producer = pushFromIterable(data)

/* create Pool, returning PullProducer and PushConsumer */
const { pull, push } = pool({ highWatermark: 32 })

/* begin pushing to Pool's PushConsumer */
producer(push)

try {
  /* consume Pool's PullProducer */
  while (true) {
    const { value, done } = await pull()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pump

Converts Pull type producer to Push type producer.

<T> (producer: PullProducer<T>) => PushProducer<T>

import { pump } from 'promised-streams'

/* create PullProducer */
const data = [0, 1, 2, 3]
const pullProducer = pullFromIterable(data)

/* create PushProducer from PullProducer */
const pushProducer = pump(pullProducer)

/* subscribe to PushProducer */
await pushProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

Combination

pullConcat

Creates concatenated Pull producer, which will deliver data from provided producers sequentially. Once first producer is done, stream will switch to the next one. done chunk will be delivered once, when all producers are complete.

<T> (...producers: PullProducers<T>[]) => PullProducer<T>

import { pullConcat, pullFromIterable } from 'promised-streams'

const pp0 = pullFromIterable([0, 1, 2])
const pp1 = pullFromIterable([3, 4, 5])
const pp2 = pullFromIterable([6, 7, 8])

/* create concatenated PullProducer */
const concatenatedProducer = pullConcat(pp0, pp1, pp2)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await concatenatedProducer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushConcat

Concatenates all Push producers, creating single Push producer, which delivers the data from each, excluding done. End of stream is delivered once, at the end.

<T> (...producers: PushProducer<T>[]) => PushProducer<T>

import { pushConcat, pushFromIterable } from 'promised-streams'

const pp0 = pushFromIterable([0, 1, 2])
const pp1 = pushFromIterable([3, 4, 5])
const pp2 = pushFromIterable([6, 7, 8])

/* create concatenated PushProducer */
const concatenatedProducer = pushConcat(pp0, pp1, pp2)

/* subscribe to PushProducer */
await concatenatedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullCombine

Creates combined Pull producer. Each latest chunk from provided producers is combined with others into an array, which is updated and delivered each time any of producers has new value. If one of producers ends, its latest value is remembered, and is delivered with values from other producers. Once all producers are done, the stream completes.

<...> (...producers: PullProducers<...>[]) => PullProducer<[...]>

import { pullCombine, pullFromIterable } from 'promised-streams'

const pp0 = pullFromIterable([0, 1])
const pp1 = pullFromIterable([2, 3])
const pp2 = pullFromIterable([4, 5])

/* create PullProducer */
const combinedProducer = pullCombine(pp0, pp1, pp2)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await combinedProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // [0, undefined, undefined]
    // [0, 2, undefined]
    // [0, 2, 4]
    // [1, 2, 4]
    // [1, 3, 4]
    // [1, 3, 5]
  }
} catch (e) {
  console.error(e)
}

pushCombine

Creates combined Push producer. Each latest chunk from provided producers is combined with others into an array, which is updated and delivered each time any of producers has new value. If one of producers ends, its latest value is remembered, and is delivered with values from other producers. Once all producers are done, the stream completes.

<...> (...producers: PushProducer<...>[]) => PushProducer<[...]>

import { pushCombine, pushFromIterable } from 'promised-streams'

const pp0 = pushFromIterable([0, 1])
const pp1 = pushFromIterable([2, 3])
const pp2 = pushFromIterable([4, 5])

/* create combined PushProducer */
const combinedProducer = pushCombine(pp0, pp1, pp2)

/* subscribe to PushProducer */
await combinedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // [0, undefined, undefined]
    // [0, 2, undefined]
    // [0, 2, 4]
    // [1, 2, 4]
    // [1, 3, 4]
    // [1, 3, 5]
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullMerge

Creates Pull producer, which delivers values from provided producers as soon as available, so the values from all producers are mixed with each other in the resulting stream. Stream ends when all producers are complete.

<...> (...producers: PullProducers<...>[]) => PullProducer<...>

import { pullMerge, pullFromIterable } from 'promised-streams'

const pp0 = pullFromIterable([0, 1])
const pp1 = pullFromIterable([2, 3])
const pp2 = pullFromIterable([4, 5])

/* create merged PullProducer */
const mergedProducer = pullMerge(pp0, pp1, pp2)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await mergedProducer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushMerge

Creates Push producer, which delivers values from provided producers as soon as available, so the values from all producers are mixed with each other in the resulting stream. Stream ends when all producers are complete.

<...> (...producers: PushProducer<...>[]) => PushProducer<...>

import { pushMerge, pushFromIterable } from 'promised-streams'

const pp0 = pushFromIterable([0, 1])
const pp1 = pushFromIterable([2, 3])
const pp2 = pushFromIterable([4, 5])

/* create merged PushProducer */
const mergedProducer = pushMerge(pp0, pp1, pp2)

/* subscribe to PushProducer */
await mergedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullStartWith

Creates Pull producer, which will stream values starting with provided ones.

<T> (...values: T[]) => (producer: PullProducer<T>) => PullProducer<T>

import { pullStartWith, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([2, 3])
/* create PullProducer */
const startWithProducer = pullStartWith(0, 1)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await startWithProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  }
} catch (e) {
  console.error(e)
}

pushStartWith

Creates Push producer, which will stream values starting with provided ones.

<T> (...values: T[]) => (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushStartWith, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([2, 3])

const startWithProducer = pushStartWith(0, 1)(producer)

/* subscribe to PushProducer */
await startWithProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullWithLatest

Creates Pull producer, which streams values from mainProducer, combined with latest values from provided producers. Only mainProducer can initiate chunk delivery. Stream ends when mainProducer completes.

<...> (...producers: PullProducer<...>[]) => <T>(mainProducer: PullProducer<T>) => PullProducer<[T, ...]>

import { pullWithLatest } from 'promised-streams'

const pp0 = getPullProducer()
const pp1 = getPullProducer()
const mainProducer = getPullProducer()

/* create PullProducer */
const withLatestProducer = pullWithLatest(pp0, pp1)(mainProducer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await withLatestProducer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushWithLatest

Creates Push producer, which streams values from mainProducer, combined with latest values from provided producers. Only mainProducer can initiate chunk delivery. Stream ends when mainProducer completes.

<...> (...producers: PushProducer<...>[]) => <T> (mainProducer: PushProducer<T>) => PushProducer<T, ...>

import { pushWithLatest } from 'promised-streams'

const pp0 = getPushProducer()
const pp1 = getPushProducer()
const mainProducer = getPushProducer()

const withLatestProducer = pushWithLatest(pp0, pp1)(mainProducer)

/* subscribe to PushProducer */
await withLatestProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullZip

Creates Pull producer, which combines values from provided producers, to be delivered strictly in sync. Stream ends when one of producers completes.

<...> (...producers: PullProducer<...>[]) => PullProducer<[...]>

import { pullZip } from 'promised-streams'

const pp0 = getPullProducer()
const pp1 = getPullProducer()
const pp2 = getPullProducer()

/* create PullProducer */
const zippedProducer = pullZip(pp0, pp1, pp2)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await zippedProducer()

    if (done) {
      break
    }

    console.log(value)
  }
} catch (e) {
  console.error(e)
}

pushZip

Creates Push producer, which combines values from provided producers, to be delivered strictly in sync. Stream ends when one of producers completes.

<...> (...producers: PushProducer<...>[]) => PushProducer<[...]>

import { pushZip } from 'promised-streams'

const pp0 = getPushProducer()
const pp1 = getPushProducer()
const pp2 = getPushProducer()

/* create zip PushProducer */
const zippedProducer = pushZip(pp0, pp1, pp2)

/* subscribe to PushProducer */
await zippedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

Filtering

pullFilter

Creates Pull producer, which streams data, filtered by provided predicate function.

<T> (predicate: (arg: T) => Promise<boolean> | boolean) => (producer: PullProducer<T>) => PullProducer<T>

import { pullFilter, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

const isEven = x => x % 2 === 0

/* create filtered producer */
const filteredProducer = pullFilter(isEven)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 2
  }
} catch (e) {
  console.error(e)
}

pushFilter

Creates Push producer, which streams data, filtered by provided predicate function.

<T> (predicate: (arg: T) => Promise<boolean> | boolean) => (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushFilter, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const isEven = x => x % 2 === 0

/* create filtered producer */
const filteredProducer = compose(
  producer,
  pushFilter(isEven)
)

/* subscribe to PushProducer */
await filteredProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 2
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullDistinct

Creates Pull producer, which streams data, filtered by provided isAllowed function.

<T> (isAllowed: (prev: T, next: T) => Promise<boolean> | boolean) => (producer: PullProducer<T>) => PullProducer<T>

import { pullDistinct, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 0, 1, 2, 2])

const isAllowed = (prev, next) => prev !== next

/* create filtered PullProducer */
const filteredProducer = pullDistinct(isAllowed)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  }
} catch (e) {
  console.error(e)
}

pushDistinct

Creates Push producer, which streams data, filtered by provided isAllowed function.

<T> (isAllowed: (prev: T, next: T) => Promise<boolean> | boolean) => (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushDistinct, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 0, 1, 2, 2])

const isAllowed = (prev, next) => prev !== next

/* create filtered producer */
const filteredProducer = compose(
  producer,
  pushDistinct(isAllowed)
)

/* subscribe to PushProducer */
await filteredProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullDistinctUntilChanged

Creates Pull producer, passing only chunks which are different than previous one. <T> (producer: PullProducer<T>) => PullProducer<T>

import { pullDistinctUntilChanged, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 0, 1, 2, 2])

/* create filtered PullProducer */
const filteredProducer = pullDistinctUntilChanged(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  }
} catch (e) {
  console.error(e)
}

pushDistinctUntilChanged

Creates Push producer, passing only chunks which are different than previous one.

<T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushDistinctUntilChanged, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 0, 1, 2, 2])
/* create filtered producer */
const filteredProducer = compose(
  producer,
  pushDistinctUntilChanged
)

/* subscribe to PushProducer */
await filteredProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullUnique

Creates Pull producer, passing only chunks which are unique to whole previous sequence.

<T> (producer: PullProducer<T>) => PullProducer<T>

import { pullUnique, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 0, 1, 2])

/* create filtered PullProducer */
const filteredProducer = pullUnique(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  }
} catch (e) {
  console.error(e)
}

pushUnique

Creates Push producer, passing only chunks which are unique to whole previous sequence.

<T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushUnique, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 0, 1, 2])

/* create filtered producer */
const filteredProducer = compose(
  producer,
  pushUnique
)

/* subscribe to PushProducer */
await filteredProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pushDebounce

Creates Push producer, debouncing the sequence of chunks by WaitFn function.

(wait: WaitFn) => <T> (consumer: PushConsumer<T>): PushConsumer<T>

type WaitFn = () => Promise<void>

import { pushDebounce, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const waitFn = () => new Promise((resolve) => setTimeout(resolve, 1000))

/* create debounced producer */
const debouncedProducer = compose(
  producer,
  pushDebounce(waitFn)
)

/* subscribe to PushProducer */
await debouncedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pushDebounceTime

Creates Push producer, debouncing the sequence of chunks by time interval provided.

(ms: number) => <T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushDebounceTime, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* create debounced producer */
const debouncedProducer = compose(
  producer,
  pushDebounceTime(1000)
)

/* subscribe to PushProducer */
await debouncedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pushThrottle

Creates Push producer, throttling the sequence of chunks by WaitFn function.

(wait: WaitFn) => <T> (consumer: PushConsumer<T>) => PushConsumer<T>

type WaitFn = () => Promise<void>

import { pushThrottle, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const waitFn = () => new Promise((resolve) => setTimeout(resolve, 100))

/* create throttled producer */
const throttledProducer = compose(
  producer,
  pushThrottle(waitFn)
)

/* subscribe to PushProducer */
await throttledProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pushThrottleTime

Creates Push producer, throttling the sequence of chunks by time interval provided.

(ms: number) => <T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushThrottleTime, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const throttledProducer = compose(
  producer,
  pushThrottleTime(100)
)

/* subscribe to PushProducer */
await throttledProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullSkip

Creates Pull provider, which skips certain number of chunks in the beginning of sequence. If negative skip value was provided, the chunks will be skipped from the end of sequence.

(numSkip: number) => <T> (producer: PullProducer<T>) => PullProducer<T>

import { pullSkip, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

/* skip first 2 items */
const filteredProducer = pullSkip(2)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 2
    // 3
  }
} catch (e) {
  console.error(e)
}

If negative skip value was provided, the chunks will be skipped from the end of sequence.

import { pullSkip, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

/* skip last 2 items */
const filteredProducer = pullSkip(-2)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
  }
} catch (e) {
  console.error(e)
}

pushSkip

Creates Push provider, which skips certain number of chunks in the beginning of sequence. If negative skip value was provided, the chunks will be skipped from the end of sequence.

(numSkip: number) => <T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushSkip, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* skip first 2 items */
const skippedProducer = compose(
  producer,
  pushSkip(2)
)

/* subscribe to PushProducer */
await skippedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 2
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

If negative skip value was provided, the chunks will be skipped from the end of sequence.

import { pushSkip, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* skip last 2 items */
const skippedProducer = compose(
  producer,
  pushSkip(-2)
)

/* subscribe to PushProducer */
await skippedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullTake

Creates Pull provider, which takes only certain number of chunks in the beginning of sequence. If negative take value was provided, the chunks will be taken from the end of sequence.

(numTake: number) => <T> (producer: PullProducer<T>) => PullProducer<T>

import { pullTake, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

/* take first 2 items */
const filteredProducer = pullTake(2)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
  }
} catch (e) {
  console.error(e)
}

If negative take value was provided, the chunks will be taken from the end of sequence.

import { pullTake, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

/* take last 2 items */
const filteredProducer = pullTake(-2)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await filteredProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 2
    // 3
  }
} catch (e) {
  console.error(e)
}

pushTake

Creates Push provider, which skips certain number of chunks in the beginning of sequence. If negative skip value was provided, the chunks will be skipped from the end of sequence.

(numTake: number) => <T> (consumer: PushConsumer<T>) => PushConsumer<T>

import { pushTake, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* take first 2 items */
const takeProducer = compose(
  producer,
  pushTake(2)
)

/* subscribe to PushProducer */
await pushProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

If negative take value was provided, the chunks will be taken from the end of sequence.

import { pushTake, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* take last 2 items */
const takeProducer = compose(
  producer,
  pushTake(-2)
)

/* subscribe to PushProducer */
await pushProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 2
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

Transformation

pullMap

Creates Pull producer, which streams chunks transformed by xf function.

<T, R> (xf: (arg: T) => Promise<R> | R) => (producer: PullProducer<T>) => PullProducer<R>

import { pullMap, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

/* create PullProducer */
const transformProducer = pullMap(x => x * 2)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await transformProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 2
    // 4
    // 6
  }
} catch (e) {
  console.error(e)
}

pushMap

Creates Push producer, which streams chunks transformed by xf function.

<T, R> (xf: (arg: T) => Promise<R> | R) => (consumer: PushConsumer<R>) => PushConsumer<T>

import { pushMap, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

/* create PushProducer */
const transformProducer = compose(
  producer,
  pushMap(x => x * 2)
)

/* subscribe to PushProducer */
await transformProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 2
    // 4
    // 6
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullReduce

Creates Pull producer, which will transform chunks by provided reducer function. The reducer will be invoked first time with no values provided, to get the initial state. The resulting stream will deliver exactly one chunk, at the end of sequence, with all values transformed through reducer, and the final state returned.

<S, T> (reducer: (state?: S, value?: T) => Promise<S> | S) => (producer: PullProducer<T>) => PullProducer<S>

import { pullReduce, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2])

const reducer = (acc, value) => acc !== undefined ? acc + value : 0

/* create PullProducer */
const reducedProducer = pullReduce(reducer)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await reducedProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 3
  }
} catch (e) {
  console.error(e)
}

pushReduce

Creates Push producer, which will transform chunks by provided reducer function. The reducer will be invoked first time with no values provided, to get the initial state. The resulting stream will deliver exactly one chunk, at the end of sequence, with all values transformed through reducer, and the final state returned.

<S, T> (reducer: (state?: S, value?: T) => Promise<S> | S) => (consumer: PushConsumer<S>) => PushConsumer<T>

import { pushReduce, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2])

const reducer = (acc, value) => acc !== undefined ? acc + value : 0

const reducedProducer = compose(
  producer,
  pushReduce(reducer)
)

/* subscribe to PushProducer */
await reducedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullScan

Creates Pull producer, which streams chunks transformed by reducer function. The reducer will be invoked first time with no values provided, to get the initial state. The resulting stream will deliver state on every new chunk passed to the reducer.

<S, T> (reducer: (state?: S, value?: T) => Promise<S> | S) => (producer: PullProducer<T>) => PullProducer<S>

import { pullScan, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

const reducer = (acc, value) => acc !== undefined ? acc + value : 0

/* create PullProducer */
const reducedProducer = pullScan(reducer)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await reducedProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 3
    // 6
  }
} catch (e) {
  console.error(e)
}

pushScan

Creates Push producer, which streams chunks transformed by reducer function. The reducer will be invoked first time with no values provided, to get the initial state. The resulting stream will deliver state on every new chunk passed to the reducer.

<S, T> (reducer: (state?: S, value?: T) => Promise<S> | S) => (consumer: PushConsumer<S>) => PushConsumer<T>

import { pushScan, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const reducer = (acc, value) => acc !== undefined ? acc + value : 0

const reducedProducer = compose(
  producer,
  pushScan(reducer)
)

/* subscribe to PushProducer */
await reducedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 3
    // 6
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullHoFlatten

Creates Pull producer, which consumes stream of producers, and provides the stream of values from these producers, subscribing to them sequentially.

<T> (producer: PullProducer<PullProducer<T>>): PullProducer<T>

import { pullHoFlatten, pullFromIterable } from 'promised-streams'

/* Get the Higher Order producer somehow */
const producer = pullFromIterable([
  pullFromIterable([0, 1]),
  pullFromIterable([2, 3])
])

const flattendedProducer = pullHoFlatten(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await flattendedProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  }
} catch (e) {
  console.error(e)
}

pushHoFlatten

Creates Push producer, which consumes stream of producers, and provides the stream of values from these producers, subscribing to them sequentially.

<T> (consumer: PushConsumer<T>) => PushConsumer<PushProducer<T>>

import { pushHoFlatten, pushFromIterable } from 'promised-streams'

/* Get the Higher Order producer somehow */
const producer = pushFromIterable([
  pushFromIterable([0, 1]),
  pushFromIterable([2, 3])
])

const flattendedProducer = compose(
  producer,
  pushHoFlatten
)

/* subscribe to PushProducer */
await flattenedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

pullFlatMap

Creates Pull producer, which transforms stream of values to stream of producers through xf, and provides the stream of values from these producers, subscribing to them sequentially.

<T, R> (xf: (arg: T) => Promise<PushProducer> | PushProducer) => (consumer: PushConsumer): PushConsumer

import { pullFlatMap, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2])

const mapAndFlatten = pullFlatMap(
  /* map to another pullProducer, creating Higher Order producer, and Flatten */
  (value) => pullFromIterable([value, value])
)

const flattenedProducer = mapAndFlatten(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await flattenedProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 0
    // 1
    // 1
    // 2
    // 2
  }
} catch (e) {
  console.error(e)
}

pushFlatMap

Creates Push producer, which transforms stream of values to stream of producers through xf, and provides the stream of values from these producers, subscribing to them sequentially.

<T, R> (xf: (arg: T) => Promise<PushProducer> | PushProducer) => (consumer: PushConsumer): PushConsumer

import { pushFlatMap, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2])

const mapAndFlatten = pushFlatMap(
  /* map to another pushProducer, creating Higher Order producer, and Flatten */
  (value) => pushFromIterable([value, value])
)

const flattenedProducer = compose(
  producer,
  mapAndFlatten
)

/* subscribe to PushProducer */
await flattenedProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 0
    // 1
    // 1
    // 2
    // 2
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

Side Effects

pullDo

Creates Pull producer, which passes incoming values to doFunction, waiting for promise if neccessary, ignoring the exceptions, then continues unchanged value to the stream.

<T> (doFunction: (arg: T) => Promise | void) => (producer: PullProducer): PullProducer

import { pullDo, pullFromIterable } from 'promised-streams'

const producer = pullFromIterable([0, 1, 2, 3])

const sideEffectProducer = pullDo(
  /* waits until fetch promise resolves */
  (value) => fetch(`http://hostname:3000?value=${value}`)
)(producer)

try {
  /* consume PullProducer */
  while (true) {
    const { value, done } = await sideEffectProducer()

    if (done) {
      break
    }

    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  }
} catch (e) {
  console.error(e)
}

pushDo

Creates Push producer, which passes incoming values to doFunction, waiting for promise if neccessary, ignoring the exceptions, then continues unchanged value to the stream.

(doFunction: (result: T) => Promise | void) => (consumer: PushConsumer): PushConsumer

import { pushDo, pushFromIterable } from 'promised-streams'

const producer = pushFromIterable([0, 1, 2, 3])

const sideEffects = pushDo(
  /* waits until fetch promise resolves */
  (value) => fetch(`http://hostname:3000?value=${value}`)
)

const sideEffectProducer = compose(
  producer,
  sideEffects
)

/* subscribe to PushProducer */
await sideEffectProducer(async (result) => {
  try {
    /* unwrap the value */
    const { value, done } = await result

    /* check if done */
    if (done) {
      return
    }

    /* consume the value */
    console.log(value)

    /* Values will be delivered in order */
    // 0
    // 1
    // 2
    // 3
  } catch (e) {
    /* catch errors */
    console.error(e)

    /* cancel subscription */
    return Promise.reject()
  }
})

About

Promise-based Streams with pressure control, Error delivery and lots of RxJS-like operators.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages