Skip to content

Conversation

stephenplusplus
Copy link
Owner

⚠️ Breaking Changes!

This is a breaking change that will allow three types of uses, all with the new feature of backpressure automatically being applied.

Use 1, a source stream that emits arrays:

readStreamThatEmitsArrays
  .pipe(new SplitArrayStream())
  .on('data', singleItem => {}) // A single item is emitted
  .on('end', () => {}) // All items emitted

Use 2, a static array that needs to be split:

new SplitArrayStream([1, 2, 3])
  .on('data', singleItem => {}) // First `1`, Next `2`, Finally `3`
  .on('end', () => {}) // All items emitted

Use 3, a function that resolves with an array:

let nextPageToken

const getArrayFn = async () => {
  const requestOptions = { method: 'GET', uri: '...' }

  if (nextPageToken) {
    requestOptions.pageToken = nextPageToken
  }

  const response = await request(requestOptions)
  const buckets = response.buckets

  nextPageToken = response.nextPageToken

  if (!nextPageToken) {
    // `null` signals that we have no more values left to give
    buckets.push(null)
  }

  return Promise.resolve(buckets)
}

new SplitArrayStream(getArrayFn)
  .on('data', singleItem => {}) // First `1`, Next `2`, Finally `3`
  .on('end', () => {}) // All items emitted

cc @callmehiphop

src/index.ts Outdated
const array = [].slice.call(arrayValue);

while (!this._ended && consumerStreamReady && array.length > 0) {
consumerStreamReady = this.push(array.shift());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found in my benchmarking that calling shift() on a large array can be very slow. I opted for a for ... of with a conditional break statement instead.

@callmehiphop
Copy link

Just a general thought, it might be nice to offer a similar convenience to native streams where you can either pass in a getArrayFn type of method or implement a private one if you wanted to extend the class.

class MyThingy extends SplitArrayStream {
  _getArray() {
    // ... do stuff
  }
}

// or
const myThingy = new SplitArrayStream({
  getArray: () => { /* do stuff */ }
});

@stephenplusplus
Copy link
Owner Author

@callmehiphop there's a lot of new material for review in the last commit. The reason for all of the new complexity was:

  • I didn't want to push a single value after the destination stream returned false from a push().
  • I didn't want to call _read() internally ourselves
  • In the _transform implementation, I didn't want to call _transform() ourselves, which means I had to implement a _flush() handler to make sure data wouldn't be dropped

I tried to solve these with a queue technique, that will push as many results as it can until the destination stream is exhausted. Then when the stream is ready for more, we push the new results to the end of the queue, and start to flush as many as we can.

Please let me know if you see anything busted about this, but as with all things streams, it might take 6 days, 10 drinks, and a few calming nature hikes to fully figure out how this all works... but, I think it does work :)

@stephenplusplus
Copy link
Owner Author

Yo @callmehiphop, whatcha think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants