diff --git a/.eslintrc.js b/.eslintrc.js index 73adf75..47dfacb 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -3,5 +3,8 @@ module.exports = { extends: [ '@overlookmotel/eslint-config' + ], + rules: [ + 'prefer-object-spread': ['off'] ] }; diff --git a/README.md b/README.md index 97137fd..0e09a63 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,92 @@ # Chop a single stream of data into a series of readable streams with rewind +## What's it for + +The use case is: + +* You are transfering data from a stream to a service/process which needs it in chunks +* Each chunk can be streamed +* If a chunk fails to transfer, you need to be able to "rewind" to send it again + +An example is upload to Google Drive with the "resumable" method for large files (the use case that this package was created for). + ## Usage -This module is under development and not ready for use yet. +### Installation + +``` +npm install stream-chop +``` + +### Usage + +```js +const ChopStream = require('stream-chop'); + +const inputStream = fs.createReadStream('file.mov'); +const chopStream = new ChopStream(); + +inputStream.pipe(chopStream); + +// Stream bytes 1024 - 2048 of data to file +const subStream1 = chopStream.chunk(1024, 1024); +const outStream1 = fs.createWriteStream('part1.mov'); +subStream1.pipe(outStream1); + +outStream1.on('end', () => { + console.log('Done!'); +}); +``` + +### Methods + +#### `.chunk( start, length ) + +Get a stream for specified chunk of the input stream. + +```js +// Get stream for 1st 256 KiB of input stream +const subStream1 = chopStream.chunk(0, 1024 * 256); +``` + +The chunk will be buffered internally, so `.chunk()` can be called again with same `start` if transferring the chunk fails and it needs to be sent again. + +When `.chunk()` is called again, any data buffered before the start point is discarded. i.e. You cannot stream 2 chunks concurrently. It must be one at a time. + +Calling `.chunk()` will cause any existing unfinished chunk stream to emit an `error` event and stop streaming. + +#### `.clearBuffer( [end] )` + +Clear data from internal buffer before byte `end`. + +If `end` is undefined, then entire buffer will be cleared. + +```js +// Discard any data in buffer +chopStream.clearBuffer(); + +// Discard first 128 KiB of stream data if it's buffered +chopStream.clearBuffer(1024 * 128); +``` + +`.chunk(1000)` automatically calls `.clearBuffer(1000)` internally. + +Once data has been discarded from the buffer, the stream can no longer be "rewound" to stream that data out again. + +### Properties + +#### `.bufferStart` + +Returns byte position of start of buffer. Returns `null` if buffer is empty. + +#### `.bufferEnd` + +Returns byte position of end of buffer. Returns `null` if buffer is empty. + +#### `.bufferLength` + +Returns number of bytes in buffer. Returns `0` if buffer is empty. ## Tests diff --git a/index.js b/index.js index 128b1c2..d7c3d85 100644 --- a/index.js +++ b/index.js @@ -5,4 +5,4 @@ 'use strict'; // Exports -module.exports = {}; +module.exports = require('./lib/'); diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 0000000..8d3327e --- /dev/null +++ b/lib/index.js @@ -0,0 +1,188 @@ +/* -------------------- + * stream-chop module + * ------------------*/ + +'use strict'; + +// Modules +const {Writable, Readable} = require('stream'); + +// Exports +class ChopStream extends Writable { + constructor(options) { + options = Object.assign({}, options, {decodeStrings: true}); + super(options); + + this.bufferStart = null; + this.bufferLength = 0; + this._position = 0; + this._buffers = []; + this._currentStream = null; + this._currentPos = null; + this._currentEnd = null; + this._currentFlowing = false; + this._writeCb = null; + } + + _write(chunk, encoding, cb) { // NB `encoding` is ignored because `decodeStrings` option is used + // Error if previous write not complete yet + if (this._writeCb != null) { + return cb(new Error('._write() called before last write complete')); + } + + // Ignore empty chunks + let chunkLength = chunk.length; + if (chunkLength === 0) return cb(null); + + // Add chunk to buffer + if (this.bufferStart == null) this.bufferStart = this._position; + this.bufferLength += chunkLength; + + const currentEnd = this._currentEnd; + const newPosition = this._position + chunkLength; + this._position = newPosition; + + if (currentEnd != null && newPosition > currentEnd) { + // Current chunk goes beyond end of chunk required in current stream. + // Split into 2 chunks. + chunkLength = newPosition - currentEnd; + let chunkNext; + [chunk, chunkNext] = bufferSplit(chunk, chunkLength); + this._buffers.push(chunk, chunkNext); + } else { + this._buffers.push(chunk); + } + + // Pass to current stream + // TODO Write this + this._writeCb = cb; + } + + /** + * Get a readable stream for specified chunk of input stream. + * @param {number} start - Bytes position to start chunk + * @param {number} length - Length of chunk in bytes + * @returns {Stream} - Chunk stream + */ + chunk(start, length) { + // Validate input + if (start == null) throw new Error('start must be provided'); + if (length == null) throw new Error('length must be provided'); + if (!Number.isInteger(start)) throw new Error('start must be an integer'); + if (!Number.isInteger(length)) throw new Error('length must be an integer'); + + // Clear buffer before start point + this._clearBuffer(start); + + // Error if trying to read from stream which is not in future and not buffered + const {bufferStart} = this; + if ((bufferStart == null || start < bufferStart) && start < this._position) { + throw new Error(`Cannot read chunk starting at byte ${start} - it has already passed and is not buffered`); + } + + // End existing stream + if (this._currentStream) { + // TODO Write this + } + + // Create new output stream + this._currentPos = start; + this._currentEnd = start + length; + this._currentFlowing = false; + this._currentStream = new Readable({ + _read: (bytes) => this._readChunk(bytes) + }); + + return currentStream; + } + + /** + * Called when `._read` called on chunk stream. + * @private + * @param {number} bytes - Number of bytes to read + * @returns {undefined} + */ + _readChunk(bytes) { + // TODO Write this + } + + /** + * Clear buffer up to specified byte position. + * @param {number} [end] - Byte position to clear before + * @returns {undefined} + */ + clearBuffer(end) { + // Validate input + if (end != null && !Number.isInteger(end)) throw new Error('end must be an integer if provided'); + + this._clearBuffer(end); + } + + /** + * Clear buffer up to specified byte position. + * Only called internally, so does not validate input. + * @private + * @param {number} [end] - Byte position to clear before + * @returns {undefined} + */ + _clearBuffer(end) { + // Exit if buffer already empty + const {bufferStart, bufferEnd} = this; + if (bufferStart == null) return; + + // If no `end` provided or whole buffer is before `end`, clear all buffer + if (end == null || bufferEnd <= end) { + this._buffers.length = 0; + this.bufferStart = null; + this.bufferLength = 0; + return; + } + + // Exit if no data buffered before this point + if (end <= bufferStart) return; + + // Locate position in buffer to cut from + const buffers = this._buffers; + let numToDiscard = 0, + pos = bufferStart; + for (const buffer of buffers) { + pos += buffer.length; + if (pos > end) { + // Split buffer + buffers[numToDiscard] = buffer.slice(buffer.length + end - pos); + break; + } + + numToDiscard++; + + if (pos === end) break; + } + + if (numToDiscard > 0) buffers.splice(0, numToDiscard); + this.bufferStart = end; + this.bufferLength = bufferEnd - end; + } + + /** + * Getter for `bufferEnd` + * @returns {number} - Byte position of end of buffer or null if no buffer + */ + get bufferEnd() { + const {bufferStart} = this; + if (bufferStart == null) return null; + return bufferStart + this.bufferLength; + } +} + +module.exports = ChopStream; + +/* + * Utility functions + */ + +function bufferSplit(buffer, splitAt) { + return [ + buffer.slice(0, splitAt), + buffer.slice(splitAt) + ]; +} \ No newline at end of file