Skip to content

Commit

Permalink
Switch FlattenStream to ES6 class
Browse files Browse the repository at this point in the history
  • Loading branch information
rlidwka committed Nov 29, 2021
1 parent abe35a4 commit 506bd5e
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 222 deletions.
219 changes: 108 additions & 111 deletions lib/flatten-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,160 +15,157 @@

const Denque = require('denque');
const stream = require('stream');
const util = require('util');

const STATE_IDLE = 0; // no data in queue
const STATE_WRITE = 1; // writing a string to the output
const STATE_FLOWING = 2; // piping one of input streams to the output
const STATE_PAUSED = 3; // output stream does not accept more data


function FlattenStream(options) {
if (!(this instanceof FlattenStream)) return new FlattenStream(options);
class FlattenStream extends stream.Duplex {
constructor(options) {
super(Object.assign({}, options, {
writableObjectMode: true,
readableObjectMode: false,
allowHalfOpen: false
}));

this.queue = new Denque();
this.state = STATE_IDLE;
this.top_chunk_stream = null;
this.top_chunk_callback = null;
this.top_chunk_read_fn = null;
this.stream_ended = false;
}

this.queue = new Denque();
this.state = STATE_IDLE;
this.top_chunk_stream = null;
this.top_chunk_callback = null;
this.top_chunk_read_fn = null;
this.stream_ended = false;

stream.Duplex.call(this, Object.assign({}, options, {
writableObjectMode: true,
readableObjectMode: false,
allowHalfOpen: false
}));
}
// Recursive function to add data to internal queue
//
_add_data(data, fn) {
if (Array.isArray(data)) {
// Flatten any arrays, callback is called when last element is processed
data.forEach((el, idx) => this._add_data(el, (idx === data.length - 1 ? fn : null)));
return;
}

this.queue.push([ data, fn ]);
}

util.inherits(FlattenStream, stream.Duplex);

_write(data, encoding, callback) {
this._add_data(data, callback);

// Recursive function to add data to internal queue
//
function add_data(data, fn) {
if (Array.isArray(data)) {
// Flatten any arrays, callback is called when last element is processed
data.forEach((el, idx) => add_data.call(this, el, (idx === data.length - 1 ? fn : null)));
return;
if (this.state === STATE_IDLE) this._read();
}

this.queue.push([ data, fn ]);
}

destroy() {
if (this.stream_ended) return;

FlattenStream.prototype._write = function (data, encoding, callback) {
add_data.call(this, data, callback);
this.stream_ended = true;
this.push(null);

if (this.state === STATE_IDLE) this._read();
};
if (this.top_chunk_stream && typeof this.top_chunk_stream.destroy === 'function') {
this.top_chunk_stream.destroy();
}

while (!this.queue.isEmpty()) {
let data = this.queue.shift()[0];

FlattenStream.prototype.destroy = function () {
if (this.stream_ended) return;
if (data && typeof data.destroy === 'function') data.destroy();
}
}

this.stream_ended = true;
this.push(null);

if (this.top_chunk_stream && typeof this.top_chunk_stream.destroy === 'function') {
this.top_chunk_stream.destroy();
}
_read() {
for (;;) {
if (this.state === STATE_WRITE) {
if (this.top_chunk_callback) {
this.top_chunk_callback();
}

while (!this.queue.isEmpty()) {
let data = this.queue.shift()[0];
this.state = STATE_IDLE;
this.top_chunk_callback = null;
} else if (this.state === STATE_FLOWING) {
this.top_chunk_read_fn();
return;
} else if (this.state === STATE_PAUSED) {
this.state = STATE_FLOWING;
this.top_chunk_read_fn();
return;
}

if (data && typeof data.destroy === 'function') data.destroy();
}
};
if (this.queue.isEmpty()) break;

let [ data, callback ] = this.queue.shift();

FlattenStream.prototype._read = function () {
for (;;) {
if (this.state === STATE_WRITE) {
if (this.top_chunk_callback) {
this.top_chunk_callback();
}
if (data && typeof data.on === 'function') {
// looks like data is a stream
this.state = STATE_FLOWING;
this.top_chunk_stream = data;
this.top_chunk_callback = callback;

this.state = STATE_IDLE;
this.top_chunk_callback = null;
} else if (this.state === STATE_FLOWING) {
this.top_chunk_read_fn();
return;
} else if (this.state === STATE_PAUSED) {
this.state = STATE_FLOWING;
this.top_chunk_read_fn();
return;
}
this.top_chunk_read_fn = () => {
if (this.state !== STATE_FLOWING) return;

if (this.queue.isEmpty()) break;
for (;;) {
let chunk = data.read();

let [ data, callback ] = this.queue.shift();
if (chunk === null) {
// no more data is available yet
break;
}

if (data && typeof data.on === 'function') {
// looks like data is a stream
this.state = STATE_FLOWING;
this.top_chunk_stream = data;
this.top_chunk_callback = callback;
if (this.stream_ended) break;
if (!this.push(String(chunk) + '\r\n')) {
this.state = STATE_PAUSED;
break;
}
}
};

this.top_chunk_read_fn = () => {
if (this.state !== STATE_FLOWING) return;
data.on('readable', this.top_chunk_read_fn);
this.top_chunk_read_fn();

for (;;) {
let chunk = data.read();
stream.finished(data, err => {
data.removeListener('readable', this.top_chunk_read_fn);

if (chunk === null) {
// no more data is available yet
break;
if (err) {
this.destroy();
return;
}

if (this.stream_ended) break;
if (!this.push(String(chunk) + '\r\n')) {
this.state = STATE_PAUSED;
break;
if (this.top_chunk_callback) {
this.top_chunk_callback();
}
}
};

data.on('readable', this.top_chunk_read_fn);
this.top_chunk_read_fn();
this.state = STATE_IDLE;
this.top_chunk_stream = null;
this.top_chunk_callback = null;
this.top_chunk_read_fn = null;
this._read();
});
break;

stream.finished(data, err => {
data.removeListener('readable', this.top_chunk_read_fn);
} else {
// regular data chunk (null, string, buffer)
this.state = STATE_WRITE;
this.top_chunk_callback = callback;

if (err) {
if (data === null) {
// signal to end this stream
this.destroy();
return;
}
break;

if (this.top_chunk_callback) {
this.top_chunk_callback();
} else {
/* eslint-disable no-lonely-if */
// string (or mistakenly pushed numbers and such)
if (!this.push(String(data) + '\r\n')) break;
}

this.state = STATE_IDLE;
this.top_chunk_stream = null;
this.top_chunk_callback = null;
this.top_chunk_read_fn = null;
this._read();
});
break;

} else {
// regular data chunk (null, string, buffer)
this.state = STATE_WRITE;
this.top_chunk_callback = callback;

if (data === null) {
// signal to end this stream
this.destroy();
break;

} else {
/* eslint-disable no-lonely-if */
// string (or mistakenly pushed numbers and such)
if (!this.push(String(data) + '\r\n')) break;
}
}
}
};
}


module.exports = FlattenStream;
module.exports = (...args) => new FlattenStream(...args);
Loading

0 comments on commit 506bd5e

Please sign in to comment.