Skip to content

Commit

Permalink
stream: move legacy to lib/internal dir
Browse files Browse the repository at this point in the history
Improve readability of lib/stream.js by moving the legacy abstract
Stream into lib/internal/streams/legacy.js.

PR-URL: nodejs/node#8197
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
yorkie authored and mcollina committed Feb 1, 2017
1 parent 9db8973 commit 1b30df1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 96 deletions.
93 changes: 93 additions & 0 deletions lib/internal/streams/legacy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
'use strict';

const EE = require('events');
const util = require('util');

function Stream() {
EE.call(this);
}
util.inherits(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
var source = this;

function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}

source.on('data', ondata);

function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}

dest.on('drain', ondrain);

// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}

var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;

dest.end();
}


function onclose() {
if (didOnEnd) return;
didOnEnd = true;

if (typeof dest.destroy === 'function') dest.destroy();
}

// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}

source.on('error', onerror);
dest.on('error', onerror);

// remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);

source.removeListener('end', onend);
source.removeListener('close', onclose);

source.removeListener('error', onerror);
dest.removeListener('error', onerror);

source.removeListener('end', cleanup);
source.removeListener('close', cleanup);

dest.removeListener('close', cleanup);
}

source.on('end', cleanup);
source.on('close', cleanup);

dest.on('close', cleanup);
dest.emit('pipe', source);

// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};

module.exports = Stream;
99 changes: 3 additions & 96 deletions lib/stream.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
'use strict';

module.exports = Stream;
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

const EE = require('events');
const util = require('util');

util.inherits(Stream, EE);
Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Expand All @@ -14,94 +12,3 @@ Stream.PassThrough = require('_stream_passthrough');

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;


// old-style streams. Note that the pipe method (the only relevant
// part of this class) is overridden in the Readable class.

function Stream() {
EE.call(this);
}

Stream.prototype.pipe = function(dest, options) {
var source = this;

function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}

source.on('data', ondata);

function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}

dest.on('drain', ondrain);

// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}

var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;

dest.end();
}


function onclose() {
if (didOnEnd) return;
didOnEnd = true;

if (typeof dest.destroy === 'function') dest.destroy();
}

// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}

source.on('error', onerror);
dest.on('error', onerror);

// remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);

source.removeListener('end', onend);
source.removeListener('close', onclose);

source.removeListener('error', onerror);
dest.removeListener('error', onerror);

source.removeListener('end', cleanup);
source.removeListener('close', cleanup);

dest.removeListener('close', cleanup);
}

source.on('end', cleanup);
source.on('close', cleanup);

dest.on('close', cleanup);

dest.emit('pipe', source);

// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
'lib/internal/v8_prof_processor.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
'deps/v8/tools/consarray.js',
Expand Down

0 comments on commit 1b30df1

Please sign in to comment.