diff --git a/README.md b/README.md index b9410ed..25daac0 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,125 @@ [![Greenkeeper badge](https://badges.greenkeeper.io/overlookmotel/stream-gen.svg)](https://greenkeeper.io/) [![Coverage Status](https://img.shields.io/coveralls/overlookmotel/stream-gen/master.svg)](https://coveralls.io/r/overlookmotel/stream-gen) +## Purpose + +Streams can be hard to test. + +You want to test transform streams etc with large batches of data to ensure they're robust, but you don't want to include big files to test with in the repo. + +This module provides: + +* Readable stream which streams out data of whatever size you need +* Writable stream which receives data and checks it's what it should be + +This is achieved entirely in memory (no file streams etc) but with minimal memory use. + +The streams are created from a generator function you provide, which can produce streams of whatever content and size you require. + ## Usage +### Installation + +``` +npm install stream-gen +``` + +### Classes + +#### GeneratorReadStream( gen [, options] ) + +Readable stream that gets its content from a generator. + +The generator must `yield` a byte value (0-255) each time it is called. The content of the stream is made up of these bytes. + +```js +const { GeneratorReadStream } = require('stream-gen'); + +// Data generator - creates 1KB of data +function* gen() { + for (let i = 0; i < 1024; i++) { + yield i % 256; + } +} + +const producer = new GeneratorReadStream( gen ); + +producer.pipe( destination ); +``` + +#### GeneratorWriteStream( gen [, options], callback ) + +Writable stream that receives content and compares them to result of a generator. It calls `callback` with result of the comparison. + +The generator must be of same type as for `GeneratorReadStream`. + +```js +const { GeneratorWriteStream } = require('stream-gen'); + +const checker = new GeneratorWriteStream( gen, function(err) { + if (err) return console.log('Stream differed'); + console.log('Stream as expected'); +} ); + +source.pipe( checker ); +``` + +### Putting it together + +The two parts work together to test that a transfer of data has been completed without altering the data. + +Let's say you are testing a lossless compression component. + +``` +Input -> Compress -> Decompress -> Output +``` + +Input and output should be the same. + +You can test this with: + +```js +const { GeneratorReadStream, GeneratorWriteStream } = require('stream-gen'); +const zlib = require('zlib'); + +// Data generator - creates 100MB of data +function* gen() { + for (let i = 0; i < 100 * 1024 * 1024; i++) { + yield i % 256; + } +} + +// Create producer and checker +const producer = new GeneratorReadStream( gen ), +const checker = new GeneratorWriteStream( gen, function(err) { + if (err) throw err; + console.log('It works!'); +} ); + +// Create compressor and decompressor +const compressor = zlib.createDeflate(); +const decompressor = zlib.createInflate(); + +// Pipe them all together +producer.pipe(compressor).pipe(decompressor).pipe(checker); +``` + +The callback on `checker` tells us if data after compression and decompression is the same at it started off as. + +There we go! We tested ZLIB on 100MB of data, using hardly any memory and no disc access. + +### Options + +#### GeneratorReadStream + +Options `highWaterMark` and `encoding` are passed to Node's `Readable` Stream constructor. + +`maxSize` option determines largest size chunk stream will emit. + +#### GeneratorWriteStream + +Options `highWaterMark` and `emitClose` are passed to Node's `Writable` Stream constructor. + ## Tests Use `npm test` to run the tests. Use `npm run cover` to check coverage. diff --git a/lib/index.js b/lib/index.js index e510b85..2ea042f 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,6 +4,224 @@ 'use strict'; +// Modules +const {Readable, Writable} = require('stream'), + {inherits} = require('util'); + +// Symbols +const ITERATOR = Symbol(), + OPTIONS = Symbol(), + CALLBACK = Symbol(), + READING = Symbol(), + BYTES_READ = Symbol(), + ERRORED = Symbol(); + // Exports -module.exports = function() { +module.exports = { + GeneratorReadStream, + GeneratorWriteStream }; + +/** + * `GeneratorReadStream` class + * Readable stream created from a generator + * @param {Function} gen - Generator function + * @param {Object} [options] - Options + * @param {number} [options.highWaterMark] - Passed to Readable constructor + * @param {string} [options.encoding] - Passed to Readable constructor + * @param {number} [options.maxSize] - Maximum size of chunks to emit + */ +function GeneratorReadStream(gen, options) { + // Call super constructor + options = Object.assign({}, options); + const streamOptions = {}; + for (let opt of ['highWaterMark', 'encoding']) { + if (options[opt] != null) streamOptions[opt] = options[opt]; + } + + Readable.call(this, streamOptions); + + // Save options + this[OPTIONS] = options; + + // Create iterator from generator + this[ITERATOR] = gen(options); + + // Init + this[READING] = false; +} + +inherits(GeneratorReadStream, Readable); + +GeneratorReadStream.prototype._read = function(size) { + // If already reading, exit + // This shouldn't happen according to Node.js streams docs, but it seems to + // sometimes with Node 6 and 8. + if (this[READING]) return; + this[READING] = true; + + // Work out size of chunk to emit + const {maxSize} = this[OPTIONS]; + if (maxSize && maxSize < size) size = maxSize; + + // Iterate through iterator + const iterator = this[ITERATOR]; + let wantsMore = true, + ended = false; + while (wantsMore && !ended) { + // Fill buffer from generator + let buffer; + ({buffer, ended} = bufferFromIterator(iterator, size)); + + if (buffer.length > 0) wantsMore = this.push(buffer); + } + + if (ended) this.push(null); + + this[READING] = false; +}; + +GeneratorReadStream.prototype._destroy = function(err, cb) { // jshint ignore:line + // Clean up + delete this[OPTIONS]; + delete this[ITERATOR]; + delete this[READING]; + + cb(); +}; + +if (!GeneratorReadStream.prototype.destroy) { + GeneratorReadStream.prototype.destroy = function(err) { + this._destroy(err, noop); + }; +} + +/* + * `GeneratorWriteStream` class + * Writable stream that compares to result of a generator + */ +// Check if `._final()` supported (introduced in Node v8.0.0) +const shimFinal = new Writable()._writableState.finalCalled == null; + +function GeneratorWriteStream(gen, options, cb) { + // Conform options + const streamOptions = {}; + if (typeof options == 'function') { + cb = options; + options = {}; + } else { + options = Object.assign({}, options); + for (let opt of ['highWaterMark', 'emitClose']) { + if (options[opt] != null) streamOptions[opt] = options[opt]; + } + } + + // Call super constructor + Writable.call(this, streamOptions); + + // Ensure `._final()` called when finishing + if (shimFinal) this.on('prefinish', () => this._final(noop)); + + // Save callback + this[CALLBACK] = cb; + + // Create iterator from generator + this[ITERATOR] = gen(options); + + // Init + this[ERRORED] = false; + this[BYTES_READ] = 0; +} + +inherits(GeneratorWriteStream, Writable); + +GeneratorWriteStream.prototype._write = function(chunk, encoding, cb) { // jshint ignore:line + if (chunk.length == 0) return cb(); + + // If already errored, exit + if (this[ERRORED]) return cb(); + + // Get buffer from iterator + const {buffer} = bufferFromIterator(this[ITERATOR], chunk.length); + + this[BYTES_READ] += buffer.length; + + // Check is as expected + if (buffer.length != chunk.length) { + // Stream continued after expected + mismatch(this, `Too much data - expected ${this[BYTES_READ]}, got at least ${this[BYTES_READ] + chunk.length - buffer.length}`); + } else if (Buffer.compare(buffer, chunk) != 0) { + // Stream contains wrong data + mismatch(this, `Wrong data - differed in byte range ${this[BYTES_READ] - buffer.length}-${this[BYTES_READ] - 1}`); + } + + cb(); +}; + +GeneratorWriteStream.prototype._final = function(cb) { // jshint ignore:line + if (this[ERRORED]) return cb(); + + // Stream is finished - check generator ended + const {done} = this[ITERATOR].next(); + if (done) { + // Stream and generator ended simultaeously - callback without error + this[CALLBACK](); + } else { + // Stream ended prematurely + mismatch(this, `Not enough data - ended prematurely at ${this[BYTES_READ]}`); + } + + cb(); +}; + +GeneratorWriteStream.prototype._destroy = function(err, cb) { // jshint ignore:line + // Clean up + delete this[CALLBACK]; + delete this[ITERATOR]; + delete this[ERRORED]; + delete this[BYTES_READ]; + + cb(); +}; + +if (!GeneratorWriteStream.prototype.destroy) { + GeneratorWriteStream.prototype.destroy = function(err) { + this._destroy(err, noop); + }; +} + +function mismatch(stream, message) { + stream[ERRORED] = true; + stream[CALLBACK](new Error(message)); +} + +/** + * `bufferFromIterator` + * Get values from iterator and fill into buffer + * @param {Iterator} iterator - Iterator to iterate over + * @param {number} size - Number of bytes to read + * @returns {Object} + * @returns {Buffer} .buffer - Buffer + * @returns {boolean} .ended - `true` if iterator ended, `false` if not + */ +function bufferFromIterator(iterator, size) { + let buffer = Buffer.allocUnsafe(size), + ended = false; + + for (let i = 0; i < size; i++) { + const {value, done} = iterator.next(); + + if (done) { + // Iterator finished - truncate buffer + buffer = buffer.slice(0, i); + ended = true; + break; + } + + buffer[i] = value; + } + + return {buffer, ended}; +} + +function noop() {} diff --git a/test/all.test.js b/test/all.test.js index 9bc0fe6..bc9e13f 100644 --- a/test/all.test.js +++ b/test/all.test.js @@ -8,6 +8,8 @@ // Modules const chai = require('chai'), {expect} = chai, + {Readable, Writable, PassThrough} = require('stream'), + zlib = require('zlib'), streamGen = require('../lib/'); // Init @@ -18,8 +20,144 @@ chai.config.includeStack = true; /* jshint expr: true */ /* global describe, it */ -describe('Tests', function() { - it.skip('all', function() { - expect(streamGen).to.be.ok; +const { GeneratorReadStream, GeneratorWriteStream } = streamGen; + +const LENGTH = 26; +function* gen() { + for (let i = 65; i < 65 + LENGTH; i++) { + yield i; + } +} + +describe('GeneratorReadStream', function() { + it('is subclass of Readable Stream', function() { + const s = new GeneratorReadStream(gen); + expect(s).to.be.instanceof(Readable); + }); + + it('produces expected data', function() { + const s = new GeneratorReadStream(gen); + + let res = s.read(); + expect(res.toString()).to.equal('ABCDEFGHIJKLMNOPQRSTUVWXYZ'); + res = s.read(); + expect(res).to.be.null; + }); +}); + +describe('GeneratorWriteStream', function() { + it('is subclass of Writable Stream', function() { + const s = new GeneratorWriteStream(gen); + expect(s).to.be.instanceof(Writable); + }); + + it('calls back with no error if streamed data correct', function(cb) { + const s = new GeneratorWriteStream(gen, cb); + + s.write('ABCDEFGHIJKLMNOPQRSTUVWXYZ'); + s.end(); + }); + + it('calls back with error if streamed data too short', function(cb) { + const str = 'ABCDEFGHIJKLMNOPQRSTUVWXY'; + + const s = new GeneratorWriteStream(gen, err => { + expect(err).to.be.instanceof(Error); + expect(err.message).to.equal(`Not enough data - ended prematurely at ${str.length}`); + cb(); + }); + + s.write(str); + s.end(); + }); + + it('calls back with error if streamed data too long', function(cb) { + const str = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ-'; + + const s = new GeneratorWriteStream(gen, err => { + expect(err).to.be.instanceof(Error); + expect(err.message).to.equal(`Too much data - expected ${LENGTH}, got at least ${str.length}`); + cb(); + }); + + s.write(str); + s.end(); + }); + + it('calls back with error if streamed data different', function(cb) { + const str = 'ABCDEFGHIJKL-NOPQRSTUVWXYZ'; + + const s = new GeneratorWriteStream(gen, err => { + expect(err).to.be.instanceof(Error); + expect(err.message).to.match(/^Wrong data - differed in byte range \d+-\d+$/); + cb(); + }); + + s.write(str); + s.end(); }); }); + +describe('piped', function() { + describe('with short data', function() { + runPipeTests(gen); + }); + + describe('with 1KB data', function() { + function* gen() { + for (let i = 0; i < 1024; i++) { + yield i % 256; + } + } + + runPipeTests(gen); + }); + + describe('with 1MB data', function() { + function* gen() { + for (let i = 0; i < 1024 * 1024; i++) { + yield i % 256; + } + } + + runPipeTests(gen); + }); + + describe('with 10MB data', function() { + function* gen() { + for (let i = 0; i < 10 * 1024 * 1024; i++) { + yield i % 256; + } + } + + runPipeTests(gen); + }); +}); + +function runPipeTests(gen) { + it('calls back with no error when piped directly', function(cb) { + const producer = new GeneratorReadStream(gen), + checker = new GeneratorWriteStream(gen, cb); + + producer.pipe(checker); + }); + + it('calls back with no error when piped via passthrough', function(cb) { + const producer = new GeneratorReadStream(gen), + checker = new GeneratorWriteStream(gen, cb); + + const passthrough = new PassThrough(); + + producer.pipe(passthrough).pipe(checker); + }); + + it('calls back with no error when piped via zlib', function(cb) { + const producer = new GeneratorReadStream(gen), + checker = new GeneratorWriteStream(gen, cb); + + const deflate = zlib.createDeflate(), + inflate = zlib.createInflate(); + + producer.pipe(deflate).pipe(inflate).pipe(checker); + }); +}