From fc13a4d50fbeb3248514d5098d3709c271a2f74d Mon Sep 17 00:00:00 2001 From: Stephen Date: Wed, 10 Jul 2019 17:41:36 -0400 Subject: [PATCH 1/2] feat!: implement flow control --- package.json | 6 +- readme.md | 185 +++++++++++++++++++++++++++++++-------------------- src/index.ts | 102 +++++++++++++++++++++------- test/test.ts | 185 +++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 351 insertions(+), 127 deletions(-) diff --git a/package.json b/package.json index 2740ae2..b40ab70 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "split-array-stream", "version": "2.0.0", - "description": "Safely push each item of an array to a stream", + "description": "Split an array's contents into multiple data events", "main": "./build/src/index.js", "types": "./build/src/index.d.ts", "files": [ @@ -29,13 +29,9 @@ "devDependencies": { "@types/mocha": "^5.0.0", "@types/node": "^9.6.1", - "@types/through2": "^2.0.33", "gts": "^0.5.4", "mocha": "^5.0.5", "through2": "^2.0.3", "typescript": "~2.8.1" - }, - "dependencies": { - "is-stream-ended": "^0.1.4" } } diff --git a/readme.md b/readme.md index ab49919..64e03a8 100644 --- a/readme.md +++ b/readme.md @@ -1,107 +1,148 @@ # split-array-stream -> Safely push each item of an array to a stream. +> Split an array's contents into multiple data events ```sh $ npm install --save split-array-stream ``` ```js -const split = require('split-array-stream'); -const through = require('through2'); - -const array = [ - { id: 1, user: 'Dave' }, - { id: 2, user: 'Stephen' } -]; - -const stream = through.obj(); - -stream.on('data', (item) => { - // { id: 1, user: 'Dave' } - // ...later... - // { id: 2, user: 'Stephen' } -}); - -split(array, stream).then((streamEnded) => { - if (!streamEnded) { - stream.push(null); - stream.end(); - } -}).catch(console.error); +const SplitArrayStream = require('split-array-stream').SplitArrayStream + +getReadableStreamThatEmitsArrays() + .pipe(new SplitArrayStream()) + .on('data', (item) => { + // { id: 1, user: 'Dave' } + // ...later... + // { id: 2, user: 'Stephen' } + }) ``` -Before pushing an item to the stream, `split-array-stream` checks that the stream hasn't been ended. This avoids those "push() after EOF" errors. - ### Use case Say you're getting many items from an upstream API. Multiple requests might be required to page through all of the results. You want to push the results to the stream as they come in, and only get more results if the user hasn't ended the stream. ```js -function getAllUsers() { - var stream = through.obj(); +const SplitArrayStream = require('split-array-stream').SplitArrayStream - var requestOptions = { - method: 'get', - url: 'http://api/users', - }; - - request(requestOptions, onResponse); - - function onResponse(err, response) { - split(response.users, stream).then((streamEnded) => { - if (streamEnded) { - return; - } +// Holds a pagination token the API expects. +let nextPageToken - if (response.nextPageToken) { - requestOptions.pageToken = response.nextPageToken; - request(requestOptions, onResponse); - return; - } +const getUsersFromApi = async () => { + const requestOptions = { + method: 'GET', + url: 'http://localhost:8000/users', + } - stream.push(null); - stream.end(); - }); + if (nextPageToken) { + requestOptions.pageToken = nextPageToken + } - }); + const response = await request(requestOptions) + // response = { + // "users": [ + // "callmehiphop", + // "stephenplusplus" + // ], + // "nextPageToken": "--key-used-for-pagination--" + // } + + const users = response.users + + nextPageToken = response.nextPageToken + + if (!nextPageToken) { + // When the API doesn't return a `nextPageToken`, all of the results have + // been received. + // + // Signal the end of the stream by resolving with an array with a "null" + // value inside. + // + // split-array-stream won't make any further calls to this function after + // null is received. + users.push(null) + } - return stream; + return Promise.resolve(users) } -getAllUsers() +new SplitArrayStream(getUsersFromApi) .on('data', function (user) { - // An item from the `response.users` API response + // First event: + // user = "callmehiphop" + // + // Second event: + // user = "stephenplusplus" }) .on('end', function () { - // All users received - }); + // All items from the array have been received + }) ``` +Alternatively, you could find that turning the above behavior into a stream is cleaner. -### split(array, stream, callback) - -#### array - -- Type: `Array` -- Required - -The source array. Each item will be pushed to the provided stream. +```js +const Readable = require('stream').Readable +const SplitArrayStream = require('split-array-stream').SplitArrayStream + +const getUsersFromApiAsStream = () => { + // Holds a pagination token the API expects. + let nextPageToken + + return new Readable({ + objectMode: true, + read: async function() { + if (nextPageToken) { + requestOptions.pageToken = nextPageToken + } -#### stream + const response = await request(requestOptions) + // response = { + // "users": [ + // "callmehiphop", + // "stephenplusplus" + // ], + // "nextPageToken": "--key-used-for-pagination--" + // } + + // This pushes the array as a data event that `split-array-stream` + // receives. + stream.push(response.users) + + nextPageToken = response.nextPageToken + + if (!nextPageToken) { + // The readable stream is over. We have all of the results from the API. + // + // To end the stream, push `null`. + this.push(null) + } + }, + }) +} -- Type: `Stream` -- Required +getUsersFromApiAsStream() + .pipe(new SplitArrayStream()) + .on('data', function (user) { + // First event: + // user = "callmehiphop" + // + // Second event: + // user = "stephenplusplus" + }) + .on('end', function () { + // All items from the array have been received + }) +```` -The destination stream to receive the items of the array. -#### callback(streamEnded) +### split([getArrayFn]) -- Type: `Function` -- Required +#### getArrayFn -Callback function executed after all items of the array have been iterated. +- Type: `Array` | `Function` +- Optional -##### callback.streamEnded +If left undefined, split-array-stream expects to receive events as part of a pipeline, as shown in the first example above. -- Type: `Boolean` +If an array, each item will be emitted as `data` events to the next stream in the pipeline. -Lets you know if the stream has been ended while items were being pushed. +If a function, it is expected to return a Promise that resolves with an array. This function will be called each time the destination stream is ready to accept more data. **If there are no more arrays to give us, send `null`.** You may also add a `null` item into any array to signal the end of the stream. diff --git a/src/index.ts b/src/index.ts index 9eca1c0..b4685ae 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,28 +1,84 @@ -import * as ended from 'is-stream-ended'; import {Transform} from 'stream'; +// tslint:disable-next-line no-any +export type ArrayItem = any; + /** * Push an array of items into a Transform stream. - * @param array The array you want to push to the stream. - * @param stream The Transform stream into which array items are pushed. + * @param getArrayFn The array you want to push to the stream. */ -export async function split(array: Array<{}>, stream: Transform) { - return new Promise((resolve, reject) => { - const arr = [].slice.call(array); - function loopyloop() { - // Ensure the stream wasn't closed by the consumer. - const isEnded = ended(stream); - // Ensure al items from the array haven't been pushed. - const cont = !isEnded && arr.length > 0; - if (cont) { - stream.push(arr.shift()); - // For large arrays, use setImmediate to ensure other microtasks - // and I/O operations have a chance to execute. - setImmediate(loopyloop); - } else { - resolve(isEnded); - } - } - loopyloop(); - }); -} +export class SplitArrayStream extends Transform { + private _ended: boolean; + private _getArrayFn?: Function; + + constructor(getArrayFn?: Function|ArrayItem[]) { + super({objectMode: true}); + + this._ended = false; + + // When the input is an array, the user just wants it split up and emitted + // as data events. + if (Array.isArray(getArrayFn)) { + const array = [].slice.call(getArrayFn); + array.push(null); + this._getArrayFn = () => Promise.resolve(array); + } + + // When the input is a function, the user wants that function called each + // time the destination stream is ready for more data. + if (typeof getArrayFn === 'function') { + this._getArrayFn = getArrayFn; + } + + if (typeof getArrayFn !== 'undefined') { + this._read = this._readFromFn.bind(this); + } + } + + // tslint:disable-next-line no-any + end(...args: any[]) { + this._ended = true; + return super.end(...args); + } + + async _readFromFn() { + let consumerStreamReady = true; + + let arrayValue = await this._getArrayFn!(); + + if (!Array.isArray(arrayValue)) { + arrayValue = [arrayValue]; + } + + const array = [].slice.call(arrayValue); + + while (!this._ended && consumerStreamReady && array.length > 0) { + consumerStreamReady = this.push(array.shift()); + } + + if (!this._ended && consumerStreamReady && array.length > 0) { + setImmediate(() => this._read(0)); + } + } + + _transform(array: ArrayItem[], enc: string, next: Function) { + let consumerStreamReady = true; + + array = [].slice.call(array); + + while (!this._ended && consumerStreamReady && array.length > 0) { + consumerStreamReady = this.push(array.shift()); + } + + if (this._ended) { + next(); + return; + } + + if (consumerStreamReady && array.length === 0) { + next(); + } else { + setImmediate(() => this._transform(array, enc, next)); + } + } +} \ No newline at end of file diff --git a/test/test.ts b/test/test.ts index ccfdd42..f900dca 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1,6 +1,7 @@ import * as assert from 'assert'; -import * as through from 'through2'; -import {split} from '../src'; +import {Readable} from 'stream'; + +import {SplitArrayStream} from '../src'; describe('split-array-stream', () => { const array = [ @@ -8,36 +9,166 @@ describe('split-array-stream', () => { {id: 4, user: 'Stephen'} ]; - it('should work', async () => { - let numDataEvents = 0; - const stream = through.obj(); - stream.on('data', () => numDataEvents++); - const streamEnded = await split(array, stream); - assert.strictEqual(streamEnded, false); - assert.strictEqual(numDataEvents, array.length); + describe('stream mode', () => { + let arrayStream: Readable; + + beforeEach(() => { + arrayStream = new Readable({ + objectMode: true, + read() { + this.push(array); + this.push(null); + }, + }); + }); + + it('should work', done => { + let numDataEvents = 0; + + arrayStream.pipe(new SplitArrayStream()) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = new SplitArrayStream(); + + arrayStream.pipe(sas) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + throw new Error('Should not have received this event.'); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + arrayStream.pipe(new SplitArrayStream()) + .on('data', () => {}) + .on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); + }); }); - it('should not push more results after end', async () => { - const stream = through.obj(); - const expectedNumDataEvents = 2; - let numDataEvents = 0; - stream.on('data', d => { - numDataEvents++; - if (numDataEvents === expectedNumDataEvents) { - stream.end(); - } - if (numDataEvents > expectedNumDataEvents) { - throw new Error('Should not have received this event.'); + describe('function mode', () => { + let numTimesCalled: number; + + const getArrayFn = () => { + numTimesCalled++; + + if (numTimesCalled === 1) { + return Promise.resolve(array); + } else { + return Promise.resolve(null); } + }; + + beforeEach(() => { + numTimesCalled = 0; + }); + + it('should work', done => { + let numDataEvents = 0; + + new SplitArrayStream(getArrayFn) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = + new SplitArrayStream(getArrayFn) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + throw new Error('Should not have received this event.'); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + new SplitArrayStream(getArrayFn).on('data', () => {}).on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); }); - const ended = await split(array, stream); - assert.strictEqual(ended, true); - assert.strictEqual(numDataEvents, expectedNumDataEvents); }); - it('should not modify original array', async () => { - const expectedArray = [].slice.call(array); - const ended = await split(array, through.obj()); - assert.deepEqual(array, expectedArray); + describe('array mode', () => { + it('should work', done => { + let numDataEvents = 0; + + new SplitArrayStream(array) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = + new SplitArrayStream(array) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + throw new Error('Should not have received this event.'); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + new SplitArrayStream(array).on('data', () => {}).on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); + }); }); }); From d921995fb4ea187122bf83e202cea7e1d1ff67ed Mon Sep 17 00:00:00 2001 From: Stephen Date: Tue, 16 Jul 2019 09:18:03 -0400 Subject: [PATCH 2/2] Handle backpressure... better --- readme.md | 24 +++++---- src/index.ts | 135 +++++++++++++++++++++++++++++++++++---------------- test/test.ts | 35 ++++++++++--- 3 files changed, 135 insertions(+), 59 deletions(-) diff --git a/readme.md b/readme.md index 64e03a8..6c06c8e 100644 --- a/readme.md +++ b/readme.md @@ -16,7 +16,7 @@ getReadableStreamThatEmitsArrays() }) ``` -### Use case +### Use Case Say you're getting many items from an upstream API. Multiple requests might be required to page through all of the results. You want to push the results to the stream as they come in, and only get more results if the user hasn't ended the stream. @@ -36,14 +36,19 @@ const getUsersFromApi = async () => { requestOptions.pageToken = nextPageToken } - const response = await request(requestOptions) - // response = { - // "users": [ - // "callmehiphop", - // "stephenplusplus" - // ], - // "nextPageToken": "--key-used-for-pagination--" - // } + try { + const response = await request(requestOptions) + // response = { + // "users": [ + // "callmehiphop", + // "stephenplusplus" + // ], + // "nextPageToken": "--key-used-for-pagination--" + // } + } catch (e) { + // Error? Return a rejected promise. + return Promise.reject(e); + } const users = response.users @@ -133,7 +138,6 @@ getUsersFromApiAsStream() }) ```` - ### split([getArrayFn]) #### getArrayFn diff --git a/src/index.ts b/src/index.ts index b4685ae..e2550e4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,33 +5,33 @@ export type ArrayItem = any; /** * Push an array of items into a Transform stream. - * @param getArrayFn The array you want to push to the stream. + * @param {array|function} [getArrayFn] If an array is provided, the elements in + * the array will be individually written to a stream that is connected to + * the returned `split-array-stream` stream. If a function is provided, it + * will be called every time a destination stream is ready for more data. */ export class SplitArrayStream extends Transform { private _ended: boolean; private _getArrayFn?: Function; + private _queuedArray: ArrayItem[]; constructor(getArrayFn?: Function|ArrayItem[]) { super({objectMode: true}); this._ended = false; + this._queuedArray = []; - // When the input is an array, the user just wants it split up and emitted - // as data events. if (Array.isArray(getArrayFn)) { - const array = [].slice.call(getArrayFn); - array.push(null); - this._getArrayFn = () => Promise.resolve(array); - } - - // When the input is a function, the user wants that function called each - // time the destination stream is ready for more data. - if (typeof getArrayFn === 'function') { + // When the input is an array, the user just wants it split up and emitted + // as data events. + const array = [].slice.call(getArrayFn).concat([null]); + this.getArrayFn = () => Promise.resolve(array); + this._read = this._readFromArrayFn.bind(this); + } else if (typeof getArrayFn === 'function') { + // When the input is a function, the user wants that function called each + // time the destination stream is ready for more data. this._getArrayFn = getArrayFn; - } - - if (typeof getArrayFn !== 'undefined') { - this._read = this._readFromFn.bind(this); + this._read = this._readFromArrayFn.bind(this); } } @@ -41,44 +41,95 @@ export class SplitArrayStream extends Transform { return super.end(...args); } - async _readFromFn() { - let consumerStreamReady = true; + _transform(array: ArrayItem[], enc: string, next: Function) { + this._queue(array); + this._flushQueue(); + next(); + } - let arrayValue = await this._getArrayFn!(); + _flush(callback: Function) { + const MAX_FORCED_FLUSH_ATTEMPTS = 3; + let numForceFlushAttempts = 0; + + const flush = (callback: Function) => { + const consumerStreamReady = this._flushQueue(); + + if (this._queuedArray.length === 0) { + callback(); + return; + } + + // More results exist. + if (consumerStreamReady) { + setImmediate(flush, callback); + } else { + // The stream isn't going to ask for more data by itself anymore, since + // we're in the _flush() handler. + + if (numForceFlushAttempts < MAX_FORCED_FLUSH_ATTEMPTS) { + // We can try a few times to drain the queued array items, but + // probably shouldn't overdo it. + numForceFlushAttempts++; + setImmediate(flush, callback); + } else { + // Ok, just let the data drop. + callback(); + } + } + }; + + flush(callback); + } - if (!Array.isArray(arrayValue)) { - arrayValue = [arrayValue]; - } + // Gets mapped to `_read` when appropriate (see constructor). + private _readFromArrayFn() { + const read = async () => { + try { + let array = await this._getArrayFn!(); - const array = [].slice.call(arrayValue); + if (!Array.isArray(array)) { + array = [array]; + } - while (!this._ended && consumerStreamReady && array.length > 0) { - consumerStreamReady = this.push(array.shift()); - } + this._queue(array); - if (!this._ended && consumerStreamReady && array.length > 0) { - setImmediate(() => this._read(0)); - } + const consumerStreamReady = this._flushQueue(); + + if (consumerStreamReady) { + read(); + } + } catch (e) { + // If the user rejects the `getArrayFn` function's returned Promise, we + // consider it an error to destroy the stream with. + this.destroy(e); + } + }; + + read(); } - _transform(array: ArrayItem[], enc: string, next: Function) { - let consumerStreamReady = true; + private _flushQueue() { + return this._emitArray(this._queuedArray); + } - array = [].slice.call(array); + private _queue(array: ArrayItem[]) { + this._queuedArray = this._queuedArray.concat(array); + } - while (!this._ended && consumerStreamReady && array.length > 0) { - consumerStreamReady = this.push(array.shift()); + private _emitArray(array: ArrayItem[]) { + let consumerStreamReady = true; + let numItemsEmitted = 0; + + for (const arrayItem of array) { + if (this._ended || !consumerStreamReady) { + break; + } + numItemsEmitted++; + consumerStreamReady = this.push(arrayItem); } - if (this._ended) { - next(); - return; - } + this._queuedArray = array.slice(numItemsEmitted); - if (consumerStreamReady && array.length === 0) { - next(); - } else { - setImmediate(() => this._transform(array, enc, next)); - } + return consumerStreamReady && !this._ended; } } \ No newline at end of file diff --git a/test/test.ts b/test/test.ts index f900dca..f4c5b67 100644 --- a/test/test.ts +++ b/test/test.ts @@ -26,6 +26,7 @@ describe('split-array-stream', () => { let numDataEvents = 0; arrayStream.pipe(new SplitArrayStream()) + .on('error', done) .on('data', () => numDataEvents++) .on('end', () => { assert.strictEqual(numDataEvents, array.length); @@ -40,6 +41,7 @@ describe('split-array-stream', () => { const sas = new SplitArrayStream(); arrayStream.pipe(sas) + .on('error', done) .on('data', () => { numDataEvents++; @@ -47,7 +49,7 @@ describe('split-array-stream', () => { sas.end(); } if (numDataEvents > expectedNumDataEvents) { - throw new Error('Should not have received this event.'); + done(new Error('Should not have received this event.')); } }) .on('end', () => { @@ -60,6 +62,7 @@ describe('split-array-stream', () => { const expectedArray = [].slice.call(array); arrayStream.pipe(new SplitArrayStream()) + .on('error', done) .on('data', () => {}) .on('end', () => { assert.deepStrictEqual(array, expectedArray); @@ -89,6 +92,7 @@ describe('split-array-stream', () => { let numDataEvents = 0; new SplitArrayStream(getArrayFn) + .on('error', done) .on('data', () => numDataEvents++) .on('end', () => { assert.strictEqual(numDataEvents, array.length); @@ -96,12 +100,24 @@ describe('split-array-stream', () => { }); }); + it('should destroy the stream if the promise is rejected', done => { + const error = new Error('Error.'); + + new SplitArrayStream(() => Promise.reject(error)) + .on('data', () => {}) + .on('error', err => { + assert.strictEqual(err, error); + done(); + }); + }); + it('should not push more results after end', done => { const expectedNumDataEvents = 2; let numDataEvents = 0; const sas = new SplitArrayStream(getArrayFn) + .on('error', done) .on('data', () => { numDataEvents++; @@ -109,7 +125,7 @@ describe('split-array-stream', () => { sas.end(); } if (numDataEvents > expectedNumDataEvents) { - throw new Error('Should not have received this event.'); + done(new Error('Should not have received this event.')); } }) .on('end', () => { @@ -133,6 +149,7 @@ describe('split-array-stream', () => { let numDataEvents = 0; new SplitArrayStream(array) + .on('error', done) .on('data', () => numDataEvents++) .on('end', () => { assert.strictEqual(numDataEvents, array.length); @@ -146,6 +163,7 @@ describe('split-array-stream', () => { const sas = new SplitArrayStream(array) + .on('error', done) .on('data', () => { numDataEvents++; @@ -153,7 +171,7 @@ describe('split-array-stream', () => { sas.end(); } if (numDataEvents > expectedNumDataEvents) { - throw new Error('Should not have received this event.'); + done(new Error('Should not have received this event.')); } }) .on('end', () => { @@ -165,10 +183,13 @@ describe('split-array-stream', () => { it('should not modify original array', done => { const expectedArray = [].slice.call(array); - new SplitArrayStream(array).on('data', () => {}).on('end', () => { - assert.deepStrictEqual(array, expectedArray); - done(); - }); + new SplitArrayStream(array) + .on('error', done) + .on('data', () => {}) + .on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); }); }); });