Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
overlookmotel committed Sep 21, 2019
1 parent aad6e65 commit 98a4e95
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
module.exports = {
extends: [
'@overlookmotel/eslint-config'
],
rules: [
'prefer-object-spread': ['off']
]
};
85 changes: 84 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,92 @@

# Chop a single stream of data into a series of readable streams with rewind

## What's it for

The use case is:

* You are transfering data from a stream to a service/process which needs it in chunks
* Each chunk can be streamed
* If a chunk fails to transfer, you need to be able to "rewind" to send it again

An example is upload to Google Drive with the "resumable" method for large files (the use case that this package was created for).

## Usage

This module is under development and not ready for use yet.
### Installation

```
npm install stream-chop
```

### Usage

```js
const ChopStream = require('stream-chop');

const inputStream = fs.createReadStream('file.mov');
const chopStream = new ChopStream();

inputStream.pipe(chopStream);

// Stream bytes 1024 - 2048 of data to file
const subStream1 = chopStream.chunk(1024, 1024);
const outStream1 = fs.createWriteStream('part1.mov');
subStream1.pipe(outStream1);

outStream1.on('end', () => {
console.log('Done!');
});
```

### Methods

#### `.chunk( start, length )

Get a stream for specified chunk of the input stream.

```js
// Get stream for 1st 256 KiB of input stream
const subStream1 = chopStream.chunk(0, 1024 * 256);
```

The chunk will be buffered internally, so `.chunk()` can be called again with same `start` if transferring the chunk fails and it needs to be sent again.

When `.chunk()` is called again, any data buffered before the start point is discarded. i.e. You cannot stream 2 chunks concurrently. It must be one at a time.

Calling `.chunk()` will cause any existing unfinished chunk stream to emit an `error` event and stop streaming.

#### `.clearBuffer( [end] )`

Clear data from internal buffer before byte `end`.

If `end` is undefined, then entire buffer will be cleared.

```js
// Discard any data in buffer
chopStream.clearBuffer();

// Discard first 128 KiB of stream data if it's buffered
chopStream.clearBuffer(1024 * 128);
```

`.chunk(1000)` automatically calls `.clearBuffer(1000)` internally.

Once data has been discarded from the buffer, the stream can no longer be "rewound" to stream that data out again.

### Properties

#### `.bufferStart`

Returns byte position of start of buffer. Returns `null` if buffer is empty.

#### `.bufferEnd`

Returns byte position of end of buffer. Returns `null` if buffer is empty.

#### `.bufferLength`

Returns number of bytes in buffer. Returns `0` if buffer is empty.

## Tests

Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
'use strict';

// Exports
module.exports = {};
module.exports = require('./lib/');
188 changes: 188 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/* --------------------
* stream-chop module
* ------------------*/

'use strict';

// Modules
const {Writable, Readable} = require('stream');

// Exports
class ChopStream extends Writable {
constructor(options) {
options = Object.assign({}, options, {decodeStrings: true});
super(options);

this.bufferStart = null;
this.bufferLength = 0;
this._position = 0;
this._buffers = [];
this._currentStream = null;
this._currentPos = null;
this._currentEnd = null;
this._currentFlowing = false;
this._writeCb = null;
}

_write(chunk, encoding, cb) { // NB `encoding` is ignored because `decodeStrings` option is used
// Error if previous write not complete yet
if (this._writeCb != null) {
return cb(new Error('._write() called before last write complete'));
}

// Ignore empty chunks
let chunkLength = chunk.length;
if (chunkLength === 0) return cb(null);

// Add chunk to buffer
if (this.bufferStart == null) this.bufferStart = this._position;
this.bufferLength += chunkLength;

const currentEnd = this._currentEnd;
const newPosition = this._position + chunkLength;
this._position = newPosition;

if (currentEnd != null && newPosition > currentEnd) {
// Current chunk goes beyond end of chunk required in current stream.
// Split into 2 chunks.
chunkLength = newPosition - currentEnd;
let chunkNext;
[chunk, chunkNext] = bufferSplit(chunk, chunkLength);
this._buffers.push(chunk, chunkNext);
} else {
this._buffers.push(chunk);
}

// Pass to current stream
// TODO Write this
this._writeCb = cb;
}

/**
* Get a readable stream for specified chunk of input stream.
* @param {number} start - Bytes position to start chunk
* @param {number} length - Length of chunk in bytes
* @returns {Stream} - Chunk stream
*/
chunk(start, length) {
// Validate input
if (start == null) throw new Error('start must be provided');
if (length == null) throw new Error('length must be provided');
if (!Number.isInteger(start)) throw new Error('start must be an integer');
if (!Number.isInteger(length)) throw new Error('length must be an integer');

// Clear buffer before start point
this._clearBuffer(start);

// Error if trying to read from stream which is not in future and not buffered
const {bufferStart} = this;
if ((bufferStart == null || start < bufferStart) && start < this._position) {
throw new Error(`Cannot read chunk starting at byte ${start} - it has already passed and is not buffered`);
}

// End existing stream
if (this._currentStream) {
// TODO Write this
}

// Create new output stream
this._currentPos = start;
this._currentEnd = start + length;
this._currentFlowing = false;
this._currentStream = new Readable({
_read: (bytes) => this._readChunk(bytes)
});

return currentStream;
}

/**
* Called when `._read` called on chunk stream.
* @private
* @param {number} bytes - Number of bytes to read
* @returns {undefined}
*/
_readChunk(bytes) {
// TODO Write this
}

/**
* Clear buffer up to specified byte position.
* @param {number} [end] - Byte position to clear before
* @returns {undefined}
*/
clearBuffer(end) {
// Validate input
if (end != null && !Number.isInteger(end)) throw new Error('end must be an integer if provided');

this._clearBuffer(end);
}

/**
* Clear buffer up to specified byte position.
* Only called internally, so does not validate input.
* @private
* @param {number} [end] - Byte position to clear before
* @returns {undefined}
*/
_clearBuffer(end) {
// Exit if buffer already empty
const {bufferStart, bufferEnd} = this;
if (bufferStart == null) return;

// If no `end` provided or whole buffer is before `end`, clear all buffer
if (end == null || bufferEnd <= end) {
this._buffers.length = 0;
this.bufferStart = null;
this.bufferLength = 0;
return;
}

// Exit if no data buffered before this point
if (end <= bufferStart) return;

// Locate position in buffer to cut from
const buffers = this._buffers;
let numToDiscard = 0,
pos = bufferStart;
for (const buffer of buffers) {
pos += buffer.length;
if (pos > end) {
// Split buffer
buffers[numToDiscard] = buffer.slice(buffer.length + end - pos);
break;
}

numToDiscard++;

if (pos === end) break;
}

if (numToDiscard > 0) buffers.splice(0, numToDiscard);
this.bufferStart = end;
this.bufferLength = bufferEnd - end;
}

/**
* Getter for `bufferEnd`
* @returns {number} - Byte position of end of buffer or null if no buffer
*/
get bufferEnd() {
const {bufferStart} = this;
if (bufferStart == null) return null;
return bufferStart + this.bufferLength;
}
}

module.exports = ChopStream;

/*
* Utility functions
*/

function bufferSplit(buffer, splitAt) {
return [
buffer.slice(0, splitAt),
buffer.slice(splitAt)
];
}

0 comments on commit 98a4e95

Please sign in to comment.