Permalink
Browse files

Add pi.fromAsync(callable)

  • Loading branch information...
1 parent b44fcbf commit faa5d45496bf08c7a3b2f2e69e3ff16ff58c5ebc @mixu committed Oct 28, 2015
Showing with 58 additions and 1 deletion.
  1. +45 −0 index.js
  2. +13 −1 readme.md
View
@@ -124,6 +124,51 @@ exports.toArray = function(fn) {
return stream;
};
+exports.fromAsync = function(callable) {
+ var called = false;
+ var returned = false;
+ var eof = false;
+ var arr;
+ var stream;
+
+ function read() {
+ var item;
+ if (!called) {
+ callable(function(err, results) {
+ returned = true;
+ if (err) {
+ stream.emit('error', err);
+ eof = true;
+ stream.push(null);
+ return;
+ }
+ arr = Array.isArray(results) ? results : [results];
+ read();
+ });
+ called = true;
+ return;
+ }
+ if (!returned) {
+ return;
+ }
+
+ if (arr.length > 0) {
+ do {
+ item = arr.shift();
+ } while(typeof item !== 'undefined' && stream.push(item))
+ }
+ if (arr.length === 0 && !eof) {
+ // pushing null signals EOF
+ eof = true;
+ stream.push(null);
+ }
+ }
+
+ stream = exports.readable.obj(read);
+
+ return stream;
+}
+
// Constructing streams
exports.thru = exports.through = through;
View
@@ -5,7 +5,7 @@ Like underscore for Node streams (streams2 and up).
Functions for iterating over object mode streams:
- [Iteration functions](#iteration-functions): [`forEach`](#foreach), [`map`](#map), [`reduce`](#reduce), [`filter`](#filter), [`mapKey`](#mapkey)
-- [Input and output](#input-and-output): [`fromArray`](#fromarray), [`toArray`](#toarray)
+- [Input and output](#input-and-output): [`fromArray`](#fromarray), [`toArray`](#toarray), [`fromAsync`](#fromasync)
- [Constructing streams](#constructing-streams): [`through` / `thru`](#thru--through), [`writable`](#writable), [`readable`](#readable), [`duplex`](#duplex), [`combine`](#combine), [`devnull`](#devnull), [`cap`](#cap), [`clone`](#clone)
- [Control flow](#control-flow): [`fork`](#fork), [`match`](#match), [`merge`](#merge), [`forkMerge`](#forkmerge), [`matchMerge`](#matchmerge), [`parallel`](#parallel)
- [Constructing pipelines from individual elements](#constructing-pipelines-from-individual-elements): [`pipe`](#pipe), [`head`](#head), [`tail`](#tail), [`pipeline`](#pipeline)
@@ -146,6 +146,18 @@ Returns a writable stream which buffers the input it receives into an array. Whe
You can also pass an instance of an array instead of a callback. The array's contents will be updated with the elements from the stream when the writable stream emits `finish`.
+### fromAsync
+
+```js
+pi.fromAsync(fn)
+```
+
+Returns a readable stream given an async function. (since `v1.2.0`)
+
+The async function should accept one argument, `onDone`, which is a `function(err, results)`. The function is called once - the first time someone reads from the stream. It should return either a single result, or an array of results.
+
+The stream will emit one item for each item in the result (the single result, or each array item individually), and then emit end.
+
## Constructing streams
These functions make creating readable, writable and transform streams a bit less boilerplatey.

0 comments on commit faa5d45

Please sign in to comment.