Skip to content

Commit

Permalink
Initial release
Browse files Browse the repository at this point in the history
  • Loading branch information
overlookmotel committed Apr 27, 2018
1 parent 30b24aa commit f8070ac
Show file tree
Hide file tree
Showing 3 changed files with 477 additions and 4 deletions.
117 changes: 117 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,125 @@
[![Greenkeeper badge](https://badges.greenkeeper.io/overlookmotel/stream-gen.svg)](https://greenkeeper.io/)
[![Coverage Status](https://img.shields.io/coveralls/overlookmotel/stream-gen/master.svg)](https://coveralls.io/r/overlookmotel/stream-gen)

## Purpose

Streams can be hard to test.

You want to test transform streams etc with large batches of data to ensure they're robust, but you don't want to include big files to test with in the repo.

This module provides:

* Readable stream which streams out data of whatever size you need
* Writable stream which receives data and checks it's what it should be

This is achieved entirely in memory (no file streams etc) but with minimal memory use.

The streams are created from a generator function you provide, which can produce streams of whatever content and size you require.

## Usage

### Installation

```
npm install stream-gen
```

### Classes

#### GeneratorReadStream( gen [, options] )

Readable stream that gets its content from a generator.

The generator must `yield` a byte value (0-255) each time it is called. The content of the stream is made up of these bytes.

```js
const { GeneratorReadStream } = require('stream-gen');

// Data generator - creates 1KB of data
function* gen() {
for (let i = 0; i < 1024; i++) {
yield i % 256;
}
}

const producer = new GeneratorReadStream( gen );

producer.pipe( destination );
```

#### GeneratorWriteStream( gen [, options], callback )

Writable stream that receives content and compares them to result of a generator. It calls `callback` with result of the comparison.

The generator must be of same type as for `GeneratorReadStream`.

```js
const { GeneratorWriteStream } = require('stream-gen');

const checker = new GeneratorWriteStream( gen, function(err) {
if (err) return console.log('Stream differed');
console.log('Stream as expected');
} );

source.pipe( checker );
```

### Putting it together

The two parts work together to test that a transfer of data has been completed without altering the data.

Let's say you are testing a lossless compression component.

```
Input -> Compress -> Decompress -> Output
```

Input and output should be the same.

You can test this with:

```js
const { GeneratorReadStream, GeneratorWriteStream } = require('stream-gen');
const zlib = require('zlib');

// Data generator - creates 100MB of data
function* gen() {
for (let i = 0; i < 100 * 1024 * 1024; i++) {
yield i % 256;
}
}

// Create producer and checker
const producer = new GeneratorReadStream( gen ),
const checker = new GeneratorWriteStream( gen, function(err) {
if (err) throw err;
console.log('It works!');
} );

// Create compressor and decompressor
const compressor = zlib.createDeflate();
const decompressor = zlib.createInflate();

// Pipe them all together
producer.pipe(compressor).pipe(decompressor).pipe(checker);
```

The callback on `checker` tells us if data after compression and decompression is the same at it started off as.

There we go! We tested ZLIB on 100MB of data, using hardly any memory and no disc access.

### Options

#### GeneratorReadStream

Options `highWaterMark` and `encoding` are passed to Node's `Readable` Stream constructor.

`maxSize` option determines largest size chunk stream will emit.

#### GeneratorWriteStream

Options `highWaterMark` and `emitClose` are passed to Node's `Writable` Stream constructor.

## Tests

Use `npm test` to run the tests. Use `npm run cover` to check coverage.
Expand Down
220 changes: 219 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,224 @@

'use strict';

// Modules
const {Readable, Writable} = require('stream'),
{inherits} = require('util');

// Symbols
const ITERATOR = Symbol(),
OPTIONS = Symbol(),
CALLBACK = Symbol(),
READING = Symbol(),
BYTES_READ = Symbol(),
ERRORED = Symbol();

// Exports
module.exports = function() {
module.exports = {
GeneratorReadStream,
GeneratorWriteStream
};

/**
* `GeneratorReadStream` class
* Readable stream created from a generator
* @param {Function} gen - Generator function
* @param {Object} [options] - Options
* @param {number} [options.highWaterMark] - Passed to Readable constructor
* @param {string} [options.encoding] - Passed to Readable constructor
* @param {number} [options.maxSize] - Maximum size of chunks to emit
*/
function GeneratorReadStream(gen, options) {
// Call super constructor
options = Object.assign({}, options);
const streamOptions = {};
for (let opt of ['highWaterMark', 'encoding']) {
if (options[opt] != null) streamOptions[opt] = options[opt];
}

Readable.call(this, streamOptions);

// Save options
this[OPTIONS] = options;

// Create iterator from generator
this[ITERATOR] = gen(options);

// Init
this[READING] = false;
}

inherits(GeneratorReadStream, Readable);

GeneratorReadStream.prototype._read = function(size) {
// If already reading, exit
// This shouldn't happen according to Node.js streams docs, but it seems to
// sometimes with Node 6 and 8.
if (this[READING]) return;
this[READING] = true;

// Work out size of chunk to emit
const {maxSize} = this[OPTIONS];
if (maxSize && maxSize < size) size = maxSize;

// Iterate through iterator
const iterator = this[ITERATOR];
let wantsMore = true,
ended = false;
while (wantsMore && !ended) {
// Fill buffer from generator
let buffer;
({buffer, ended} = bufferFromIterator(iterator, size));

if (buffer.length > 0) wantsMore = this.push(buffer);
}

if (ended) this.push(null);

this[READING] = false;
};

GeneratorReadStream.prototype._destroy = function(err, cb) { // jshint ignore:line
// Clean up
delete this[OPTIONS];
delete this[ITERATOR];
delete this[READING];

cb();
};

if (!GeneratorReadStream.prototype.destroy) {
GeneratorReadStream.prototype.destroy = function(err) {
this._destroy(err, noop);
};
}

/*
* `GeneratorWriteStream` class
* Writable stream that compares to result of a generator
*/
// Check if `._final()` supported (introduced in Node v8.0.0)
const shimFinal = new Writable()._writableState.finalCalled == null;

function GeneratorWriteStream(gen, options, cb) {
// Conform options
const streamOptions = {};
if (typeof options == 'function') {
cb = options;
options = {};
} else {
options = Object.assign({}, options);
for (let opt of ['highWaterMark', 'emitClose']) {
if (options[opt] != null) streamOptions[opt] = options[opt];
}
}

// Call super constructor
Writable.call(this, streamOptions);

// Ensure `._final()` called when finishing
if (shimFinal) this.on('prefinish', () => this._final(noop));

// Save callback
this[CALLBACK] = cb;

// Create iterator from generator
this[ITERATOR] = gen(options);

// Init
this[ERRORED] = false;
this[BYTES_READ] = 0;
}

inherits(GeneratorWriteStream, Writable);

GeneratorWriteStream.prototype._write = function(chunk, encoding, cb) { // jshint ignore:line
if (chunk.length == 0) return cb();

// If already errored, exit
if (this[ERRORED]) return cb();

// Get buffer from iterator
const {buffer} = bufferFromIterator(this[ITERATOR], chunk.length);

this[BYTES_READ] += buffer.length;

// Check is as expected
if (buffer.length != chunk.length) {
// Stream continued after expected
mismatch(this, `Too much data - expected ${this[BYTES_READ]}, got at least ${this[BYTES_READ] + chunk.length - buffer.length}`);
} else if (Buffer.compare(buffer, chunk) != 0) {
// Stream contains wrong data
mismatch(this, `Wrong data - differed in byte range ${this[BYTES_READ] - buffer.length}-${this[BYTES_READ] - 1}`);
}

cb();
};

GeneratorWriteStream.prototype._final = function(cb) { // jshint ignore:line
if (this[ERRORED]) return cb();

// Stream is finished - check generator ended
const {done} = this[ITERATOR].next();
if (done) {
// Stream and generator ended simultaeously - callback without error
this[CALLBACK]();
} else {
// Stream ended prematurely
mismatch(this, `Not enough data - ended prematurely at ${this[BYTES_READ]}`);
}

cb();
};

GeneratorWriteStream.prototype._destroy = function(err, cb) { // jshint ignore:line
// Clean up
delete this[CALLBACK];
delete this[ITERATOR];
delete this[ERRORED];
delete this[BYTES_READ];

cb();
};

if (!GeneratorWriteStream.prototype.destroy) {
GeneratorWriteStream.prototype.destroy = function(err) {
this._destroy(err, noop);
};
}

function mismatch(stream, message) {
stream[ERRORED] = true;
stream[CALLBACK](new Error(message));
}

/**
* `bufferFromIterator`
* Get values from iterator and fill into buffer
* @param {Iterator} iterator - Iterator to iterate over
* @param {number} size - Number of bytes to read
* @returns {Object}
* @returns {Buffer} .buffer - Buffer
* @returns {boolean} .ended - `true` if iterator ended, `false` if not
*/
function bufferFromIterator(iterator, size) {
let buffer = Buffer.allocUnsafe(size),
ended = false;

for (let i = 0; i < size; i++) {
const {value, done} = iterator.next();

if (done) {
// Iterator finished - truncate buffer
buffer = buffer.slice(0, i);
ended = true;
break;
}

buffer[i] = value;
}

return {buffer, ended};
}

function noop() {}

0 comments on commit f8070ac

Please sign in to comment.