Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "split-array-stream",
"version": "2.0.0",
"description": "Safely push each item of an array to a stream",
"description": "Split an array's contents into multiple data events",
"main": "./build/src/index.js",
"types": "./build/src/index.d.ts",
"files": [
Expand Down Expand Up @@ -29,13 +29,9 @@
"devDependencies": {
"@types/mocha": "^5.0.0",
"@types/node": "^9.6.1",
"@types/through2": "^2.0.33",
"gts": "^0.5.4",
"mocha": "^5.0.5",
"through2": "^2.0.3",
"typescript": "~2.8.1"
},
"dependencies": {
"is-stream-ended": "^0.1.4"
}
}
189 changes: 117 additions & 72 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,107 +1,152 @@
# split-array-stream
> Safely push each item of an array to a stream.
> Split an array's contents into multiple data events

```sh
$ npm install --save split-array-stream
```
```js
const split = require('split-array-stream');
const through = require('through2');

const array = [
{ id: 1, user: 'Dave' },
{ id: 2, user: 'Stephen' }
];

const stream = through.obj();

stream.on('data', (item) => {
// { id: 1, user: 'Dave' }
// ...later...
// { id: 2, user: 'Stephen' }
});

split(array, stream).then((streamEnded) => {
if (!streamEnded) {
stream.push(null);
stream.end();
}
}).catch(console.error);
const SplitArrayStream = require('split-array-stream').SplitArrayStream

getReadableStreamThatEmitsArrays()
.pipe(new SplitArrayStream())
.on('data', (item) => {
// { id: 1, user: 'Dave' }
// ...later...
// { id: 2, user: 'Stephen' }
})
```

Before pushing an item to the stream, `split-array-stream` checks that the stream hasn't been ended. This avoids those "push() after EOF" errors.

### Use case
### Use Case

Say you're getting many items from an upstream API. Multiple requests might be required to page through all of the results. You want to push the results to the stream as they come in, and only get more results if the user hasn't ended the stream.

```js
function getAllUsers() {
var stream = through.obj();
const SplitArrayStream = require('split-array-stream').SplitArrayStream

var requestOptions = {
method: 'get',
url: 'http://api/users',
};
// Holds a pagination token the API expects.
let nextPageToken

request(requestOptions, onResponse);
const getUsersFromApi = async () => {
const requestOptions = {
method: 'GET',
url: 'http://localhost:8000/users',
}

function onResponse(err, response) {
split(response.users, stream).then((streamEnded) => {
if (streamEnded) {
return;
}
if (nextPageToken) {
requestOptions.pageToken = nextPageToken
}

if (response.nextPageToken) {
requestOptions.pageToken = response.nextPageToken;
request(requestOptions, onResponse);
return;
}
try {
const response = await request(requestOptions)
// response = {
// "users": [
// "callmehiphop",
// "stephenplusplus"
// ],
// "nextPageToken": "--key-used-for-pagination--"
// }
} catch (e) {
// Error? Return a rejected promise.
return Promise.reject(e);
}

const users = response.users

stream.push(null);
stream.end();
});
nextPageToken = response.nextPageToken

});
if (!nextPageToken) {
// When the API doesn't return a `nextPageToken`, all of the results have
// been received.
//
// Signal the end of the stream by resolving with an array with a "null"
// value inside.
//
// split-array-stream won't make any further calls to this function after
// null is received.
users.push(null)
}

return stream;
return Promise.resolve(users)
}

getAllUsers()
new SplitArrayStream(getUsersFromApi)
.on('data', function (user) {
// An item from the `response.users` API response
// First event:
// user = "callmehiphop"
//
// Second event:
// user = "stephenplusplus"
})
.on('end', function () {
// All users received
});
// All items from the array have been received
})
```

Alternatively, you could find that turning the above behavior into a stream is cleaner.

### split(array, stream, callback)

#### array

- Type: `Array`
- Required

The source array. Each item will be pushed to the provided stream.

#### stream
```js
const Readable = require('stream').Readable
const SplitArrayStream = require('split-array-stream').SplitArrayStream

const getUsersFromApiAsStream = () => {
// Holds a pagination token the API expects.
let nextPageToken

return new Readable({
objectMode: true,
read: async function() {
if (nextPageToken) {
requestOptions.pageToken = nextPageToken
}

- Type: `Stream`
- Required
const response = await request(requestOptions)
// response = {
// "users": [
// "callmehiphop",
// "stephenplusplus"
// ],
// "nextPageToken": "--key-used-for-pagination--"
// }

// This pushes the array as a data event that `split-array-stream`
// receives.
stream.push(response.users)

nextPageToken = response.nextPageToken

if (!nextPageToken) {
// The readable stream is over. We have all of the results from the API.
//
// To end the stream, push `null`.
this.push(null)
}
},
})
}

The destination stream to receive the items of the array.
getUsersFromApiAsStream()
.pipe(new SplitArrayStream())
.on('data', function (user) {
// First event:
// user = "callmehiphop"
//
// Second event:
// user = "stephenplusplus"
})
.on('end', function () {
// All items from the array have been received
})
````

#### callback(streamEnded)
### split([getArrayFn])

- Type: `Function`
- Required
#### getArrayFn

Callback function executed after all items of the array have been iterated.
- Type: `Array` | `Function`
- Optional

##### callback.streamEnded
If left undefined, split-array-stream expects to receive events as part of a pipeline, as shown in the first example above.

- Type: `Boolean`
If an array, each item will be emitted as `data` events to the next stream in the pipeline.

Lets you know if the stream has been ended while items were being pushed.
If a function, it is expected to return a Promise that resolves with an array. This function will be called each time the destination stream is ready to accept more data. **If there are no more arrays to give us, send `null`.** You may also add a `null` item into any array to signal the end of the stream.
Loading