diff --git a/package.json b/package.json index 2740ae2..b40ab70 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "split-array-stream", "version": "2.0.0", - "description": "Safely push each item of an array to a stream", + "description": "Split an array's contents into multiple data events", "main": "./build/src/index.js", "types": "./build/src/index.d.ts", "files": [ @@ -29,13 +29,9 @@ "devDependencies": { "@types/mocha": "^5.0.0", "@types/node": "^9.6.1", - "@types/through2": "^2.0.33", "gts": "^0.5.4", "mocha": "^5.0.5", "through2": "^2.0.3", "typescript": "~2.8.1" - }, - "dependencies": { - "is-stream-ended": "^0.1.4" } } diff --git a/readme.md b/readme.md index ab49919..6c06c8e 100644 --- a/readme.md +++ b/readme.md @@ -1,107 +1,152 @@ # split-array-stream -> Safely push each item of an array to a stream. +> Split an array's contents into multiple data events ```sh $ npm install --save split-array-stream ``` ```js -const split = require('split-array-stream'); -const through = require('through2'); - -const array = [ - { id: 1, user: 'Dave' }, - { id: 2, user: 'Stephen' } -]; - -const stream = through.obj(); - -stream.on('data', (item) => { - // { id: 1, user: 'Dave' } - // ...later... - // { id: 2, user: 'Stephen' } -}); - -split(array, stream).then((streamEnded) => { - if (!streamEnded) { - stream.push(null); - stream.end(); - } -}).catch(console.error); +const SplitArrayStream = require('split-array-stream').SplitArrayStream + +getReadableStreamThatEmitsArrays() + .pipe(new SplitArrayStream()) + .on('data', (item) => { + // { id: 1, user: 'Dave' } + // ...later... + // { id: 2, user: 'Stephen' } + }) ``` -Before pushing an item to the stream, `split-array-stream` checks that the stream hasn't been ended. This avoids those "push() after EOF" errors. - -### Use case +### Use Case Say you're getting many items from an upstream API. Multiple requests might be required to page through all of the results. You want to push the results to the stream as they come in, and only get more results if the user hasn't ended the stream. ```js -function getAllUsers() { - var stream = through.obj(); +const SplitArrayStream = require('split-array-stream').SplitArrayStream - var requestOptions = { - method: 'get', - url: 'http://api/users', - }; +// Holds a pagination token the API expects. +let nextPageToken - request(requestOptions, onResponse); +const getUsersFromApi = async () => { + const requestOptions = { + method: 'GET', + url: 'http://localhost:8000/users', + } - function onResponse(err, response) { - split(response.users, stream).then((streamEnded) => { - if (streamEnded) { - return; - } + if (nextPageToken) { + requestOptions.pageToken = nextPageToken + } - if (response.nextPageToken) { - requestOptions.pageToken = response.nextPageToken; - request(requestOptions, onResponse); - return; - } + try { + const response = await request(requestOptions) + // response = { + // "users": [ + // "callmehiphop", + // "stephenplusplus" + // ], + // "nextPageToken": "--key-used-for-pagination--" + // } + } catch (e) { + // Error? Return a rejected promise. + return Promise.reject(e); + } + + const users = response.users - stream.push(null); - stream.end(); - }); + nextPageToken = response.nextPageToken - }); + if (!nextPageToken) { + // When the API doesn't return a `nextPageToken`, all of the results have + // been received. + // + // Signal the end of the stream by resolving with an array with a "null" + // value inside. + // + // split-array-stream won't make any further calls to this function after + // null is received. + users.push(null) + } - return stream; + return Promise.resolve(users) } -getAllUsers() +new SplitArrayStream(getUsersFromApi) .on('data', function (user) { - // An item from the `response.users` API response + // First event: + // user = "callmehiphop" + // + // Second event: + // user = "stephenplusplus" }) .on('end', function () { - // All users received - }); + // All items from the array have been received + }) ``` +Alternatively, you could find that turning the above behavior into a stream is cleaner. -### split(array, stream, callback) - -#### array - -- Type: `Array` -- Required - -The source array. Each item will be pushed to the provided stream. - -#### stream +```js +const Readable = require('stream').Readable +const SplitArrayStream = require('split-array-stream').SplitArrayStream + +const getUsersFromApiAsStream = () => { + // Holds a pagination token the API expects. + let nextPageToken + + return new Readable({ + objectMode: true, + read: async function() { + if (nextPageToken) { + requestOptions.pageToken = nextPageToken + } -- Type: `Stream` -- Required + const response = await request(requestOptions) + // response = { + // "users": [ + // "callmehiphop", + // "stephenplusplus" + // ], + // "nextPageToken": "--key-used-for-pagination--" + // } + + // This pushes the array as a data event that `split-array-stream` + // receives. + stream.push(response.users) + + nextPageToken = response.nextPageToken + + if (!nextPageToken) { + // The readable stream is over. We have all of the results from the API. + // + // To end the stream, push `null`. + this.push(null) + } + }, + }) +} -The destination stream to receive the items of the array. +getUsersFromApiAsStream() + .pipe(new SplitArrayStream()) + .on('data', function (user) { + // First event: + // user = "callmehiphop" + // + // Second event: + // user = "stephenplusplus" + }) + .on('end', function () { + // All items from the array have been received + }) +```` -#### callback(streamEnded) +### split([getArrayFn]) -- Type: `Function` -- Required +#### getArrayFn -Callback function executed after all items of the array have been iterated. +- Type: `Array` | `Function` +- Optional -##### callback.streamEnded +If left undefined, split-array-stream expects to receive events as part of a pipeline, as shown in the first example above. -- Type: `Boolean` +If an array, each item will be emitted as `data` events to the next stream in the pipeline. -Lets you know if the stream has been ended while items were being pushed. +If a function, it is expected to return a Promise that resolves with an array. This function will be called each time the destination stream is ready to accept more data. **If there are no more arrays to give us, send `null`.** You may also add a `null` item into any array to signal the end of the stream. diff --git a/src/index.ts b/src/index.ts index 9eca1c0..e2550e4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,28 +1,135 @@ -import * as ended from 'is-stream-ended'; import {Transform} from 'stream'; +// tslint:disable-next-line no-any +export type ArrayItem = any; + /** * Push an array of items into a Transform stream. - * @param array The array you want to push to the stream. - * @param stream The Transform stream into which array items are pushed. + * @param {array|function} [getArrayFn] If an array is provided, the elements in + * the array will be individually written to a stream that is connected to + * the returned `split-array-stream` stream. If a function is provided, it + * will be called every time a destination stream is ready for more data. */ -export async function split(array: Array<{}>, stream: Transform) { - return new Promise((resolve, reject) => { - const arr = [].slice.call(array); - function loopyloop() { - // Ensure the stream wasn't closed by the consumer. - const isEnded = ended(stream); - // Ensure al items from the array haven't been pushed. - const cont = !isEnded && arr.length > 0; - if (cont) { - stream.push(arr.shift()); - // For large arrays, use setImmediate to ensure other microtasks - // and I/O operations have a chance to execute. - setImmediate(loopyloop); +export class SplitArrayStream extends Transform { + private _ended: boolean; + private _getArrayFn?: Function; + private _queuedArray: ArrayItem[]; + + constructor(getArrayFn?: Function|ArrayItem[]) { + super({objectMode: true}); + + this._ended = false; + this._queuedArray = []; + + if (Array.isArray(getArrayFn)) { + // When the input is an array, the user just wants it split up and emitted + // as data events. + const array = [].slice.call(getArrayFn).concat([null]); + this.getArrayFn = () => Promise.resolve(array); + this._read = this._readFromArrayFn.bind(this); + } else if (typeof getArrayFn === 'function') { + // When the input is a function, the user wants that function called each + // time the destination stream is ready for more data. + this._getArrayFn = getArrayFn; + this._read = this._readFromArrayFn.bind(this); + } + } + + // tslint:disable-next-line no-any + end(...args: any[]) { + this._ended = true; + return super.end(...args); + } + + _transform(array: ArrayItem[], enc: string, next: Function) { + this._queue(array); + this._flushQueue(); + next(); + } + + _flush(callback: Function) { + const MAX_FORCED_FLUSH_ATTEMPTS = 3; + let numForceFlushAttempts = 0; + + const flush = (callback: Function) => { + const consumerStreamReady = this._flushQueue(); + + if (this._queuedArray.length === 0) { + callback(); + return; + } + + // More results exist. + if (consumerStreamReady) { + setImmediate(flush, callback); } else { - resolve(isEnded); + // The stream isn't going to ask for more data by itself anymore, since + // we're in the _flush() handler. + + if (numForceFlushAttempts < MAX_FORCED_FLUSH_ATTEMPTS) { + // We can try a few times to drain the queued array items, but + // probably shouldn't overdo it. + numForceFlushAttempts++; + setImmediate(flush, callback); + } else { + // Ok, just let the data drop. + callback(); + } + } + }; + + flush(callback); + } + + // Gets mapped to `_read` when appropriate (see constructor). + private _readFromArrayFn() { + const read = async () => { + try { + let array = await this._getArrayFn!(); + + if (!Array.isArray(array)) { + array = [array]; + } + + this._queue(array); + + const consumerStreamReady = this._flushQueue(); + + if (consumerStreamReady) { + read(); + } + } catch (e) { + // If the user rejects the `getArrayFn` function's returned Promise, we + // consider it an error to destroy the stream with. + this.destroy(e); + } + }; + + read(); + } + + private _flushQueue() { + return this._emitArray(this._queuedArray); + } + + private _queue(array: ArrayItem[]) { + this._queuedArray = this._queuedArray.concat(array); + } + + private _emitArray(array: ArrayItem[]) { + let consumerStreamReady = true; + let numItemsEmitted = 0; + + for (const arrayItem of array) { + if (this._ended || !consumerStreamReady) { + break; } + numItemsEmitted++; + consumerStreamReady = this.push(arrayItem); } - loopyloop(); - }); -} + + this._queuedArray = array.slice(numItemsEmitted); + + return consumerStreamReady && !this._ended; + } +} \ No newline at end of file diff --git a/test/test.ts b/test/test.ts index ccfdd42..f4c5b67 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1,6 +1,7 @@ import * as assert from 'assert'; -import * as through from 'through2'; -import {split} from '../src'; +import {Readable} from 'stream'; + +import {SplitArrayStream} from '../src'; describe('split-array-stream', () => { const array = [ @@ -8,36 +9,187 @@ describe('split-array-stream', () => { {id: 4, user: 'Stephen'} ]; - it('should work', async () => { - let numDataEvents = 0; - const stream = through.obj(); - stream.on('data', () => numDataEvents++); - const streamEnded = await split(array, stream); - assert.strictEqual(streamEnded, false); - assert.strictEqual(numDataEvents, array.length); + describe('stream mode', () => { + let arrayStream: Readable; + + beforeEach(() => { + arrayStream = new Readable({ + objectMode: true, + read() { + this.push(array); + this.push(null); + }, + }); + }); + + it('should work', done => { + let numDataEvents = 0; + + arrayStream.pipe(new SplitArrayStream()) + .on('error', done) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = new SplitArrayStream(); + + arrayStream.pipe(sas) + .on('error', done) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + done(new Error('Should not have received this event.')); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + arrayStream.pipe(new SplitArrayStream()) + .on('error', done) + .on('data', () => {}) + .on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); + }); }); - it('should not push more results after end', async () => { - const stream = through.obj(); - const expectedNumDataEvents = 2; - let numDataEvents = 0; - stream.on('data', d => { - numDataEvents++; - if (numDataEvents === expectedNumDataEvents) { - stream.end(); - } - if (numDataEvents > expectedNumDataEvents) { - throw new Error('Should not have received this event.'); + describe('function mode', () => { + let numTimesCalled: number; + + const getArrayFn = () => { + numTimesCalled++; + + if (numTimesCalled === 1) { + return Promise.resolve(array); + } else { + return Promise.resolve(null); } + }; + + beforeEach(() => { + numTimesCalled = 0; + }); + + it('should work', done => { + let numDataEvents = 0; + + new SplitArrayStream(getArrayFn) + .on('error', done) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should destroy the stream if the promise is rejected', done => { + const error = new Error('Error.'); + + new SplitArrayStream(() => Promise.reject(error)) + .on('data', () => {}) + .on('error', err => { + assert.strictEqual(err, error); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = + new SplitArrayStream(getArrayFn) + .on('error', done) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + done(new Error('Should not have received this event.')); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + new SplitArrayStream(getArrayFn).on('data', () => {}).on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); }); - const ended = await split(array, stream); - assert.strictEqual(ended, true); - assert.strictEqual(numDataEvents, expectedNumDataEvents); }); - it('should not modify original array', async () => { - const expectedArray = [].slice.call(array); - const ended = await split(array, through.obj()); - assert.deepEqual(array, expectedArray); + describe('array mode', () => { + it('should work', done => { + let numDataEvents = 0; + + new SplitArrayStream(array) + .on('error', done) + .on('data', () => numDataEvents++) + .on('end', () => { + assert.strictEqual(numDataEvents, array.length); + done(); + }); + }); + + it('should not push more results after end', done => { + const expectedNumDataEvents = 2; + let numDataEvents = 0; + + const sas = + new SplitArrayStream(array) + .on('error', done) + .on('data', + () => { + numDataEvents++; + if (numDataEvents === expectedNumDataEvents) { + sas.end(); + } + if (numDataEvents > expectedNumDataEvents) { + done(new Error('Should not have received this event.')); + } + }) + .on('end', () => { + assert.strictEqual(numDataEvents, expectedNumDataEvents); + done(); + }); + }); + + it('should not modify original array', done => { + const expectedArray = [].slice.call(array); + + new SplitArrayStream(array) + .on('error', done) + .on('data', () => {}) + .on('end', () => { + assert.deepStrictEqual(array, expectedArray); + done(); + }); + }); }); });