Skip to content
This repository has been archived by the owner on Dec 5, 2018. It is now read-only.

Commit

Permalink
Fixes #7 - out.stream does not always reference the original stream
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-steele-idem committed Oct 18, 2016
1 parent 0d7fa16 commit b60f33c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 35 deletions.
58 changes: 30 additions & 28 deletions src/AsyncStream.js
Expand Up @@ -6,9 +6,10 @@ var BufferedWriter = require('./BufferedWriter');

var voidWriter = { write:function(){} };

function State(stream, originalWriter, events) {
this.originalStream = stream;
this.originalWriter = originalWriter;
function State(root, stream, writer, events) {
this.root = root;
this.stream = stream;
this.writer = writer;
this.events = events;

this.remaining = 0;
Expand All @@ -21,26 +22,27 @@ function State(stream, originalWriter, events) {

function AsyncStream(global, writer, state, shouldBuffer) {
var finalGlobal = this.attributes = global || {};
var finalStream;
var originalStream;

if (state) {
finalStream = state.stream;
originalStream = state.stream;
} else {
var events = finalGlobal.events /* deprecated */ = writer && writer.on ? writer : new EventEmitter();

if (!writer) {
writer = new StringWriter(events);
} else if (shouldBuffer) {
finalStream = writer;
writer = new BufferedWriter(writer);
if (writer) {
originalStream = writer;
if (shouldBuffer) {
writer = new BufferedWriter(writer);
}
} else {
writer = originalStream = new StringWriter(events);
}

finalStream = finalStream || writer;
state = new State(this, writer, events);
state = new State(this, originalStream, writer, events);
}

this.global = finalGlobal;
this.stream = finalStream;
this.stream = originalStream;
this._state = state;

this.data = {};
Expand Down Expand Up @@ -75,7 +77,7 @@ var proto = AsyncStream.prototype = {
},

getOutput: function () {
return this._state.originalWriter.toString();
return this._state.writer.toString();
},

beginAsync: function(options) {
Expand Down Expand Up @@ -142,7 +144,7 @@ var proto = AsyncStream.prototype = {
}, timeout);
}

state.originalStream.emit('beginAsync', {
state.events.emit('beginAsync', {
writer: newStream,
parentWriter: this
});
Expand Down Expand Up @@ -183,7 +185,7 @@ var proto = AsyncStream.prototype = {

var remaining;

if (this === state.originalStream) {
if (this === state.root) {
remaining = state.remaining;
state.ended = true;
} else {
Expand All @@ -200,15 +202,15 @@ var proto = AsyncStream.prototype = {
if (!state.lastFired && (state.remaining - state.lastCount === 0)) {
state.lastFired = true;
state.lastCount = 0;
state.originalStream.emit('last');
state.events.emit('last');
}

if (remaining === 0) {
state.finished = true;
if (state.originalWriter.end) {
state.originalWriter.end();
if (state.writer.end) {
state.writer.end();
} else {
state.originalStream.emit('finish');
state.events.emit('finish');
}
}
}
Expand All @@ -217,7 +219,7 @@ var proto = AsyncStream.prototype = {
},

// flushNextOld: function(currentWriter) {
// if (currentWriter === this._originalWriter) {
// if (currentWriter === this._state.writer) {
// var nextStream;
// var nextWriter = currentWriter.next;
//
Expand All @@ -238,7 +240,7 @@ var proto = AsyncStream.prototype = {
//
// // If there is a nextStream,
// // set its writer to currentWriter
// // (which is the originalWriter)
// // (which is the state.writer)
// if(nextStream) {
// nextStream.writer = currentWriter;
// currentWriter.stream = nextStream;
Expand Down Expand Up @@ -276,7 +278,7 @@ var proto = AsyncStream.prototype = {
on: function(event, callback) {
var state = this._state;

if (event === 'finish' && state.originalWriter.finished) {
if (event === 'finish' && state.finished) {
callback();
return this;
}
Expand All @@ -288,7 +290,7 @@ var proto = AsyncStream.prototype = {
once: function(event, callback) {
var state = this._state;

if (event === 'finish' && state.originalWriter.finished) {
if (event === 'finish' && state.finished) {
callback();
return this;
}
Expand Down Expand Up @@ -351,7 +353,7 @@ var proto = AsyncStream.prototype = {
},

pipe: function(stream) {
this._state.originalWriter.pipe(stream);
this._state.stream.pipe(stream);
return this;
},

Expand Down Expand Up @@ -383,9 +385,9 @@ var proto = AsyncStream.prototype = {
var state = this._state;

if (!state.finished) {
var stream = state.originalWriter;
if (stream && stream.flush) {
stream.flush();
var writer = state.writer;
if (writer && writer.flush) {
writer.flush();
}
}
return this;
Expand Down
6 changes: 0 additions & 6 deletions src/BufferedWriter.js
Expand Up @@ -30,12 +30,6 @@ BufferedWriter.prototype = {
this._wrapped.end();
}
},
on: function(event, callback) {
return this._wrapped.on(event, callback);
},
once: function(event, callback) {
return this._wrapped.once(event, callback);
},

clear: function() {
this._buffer = '';
Expand Down
42 changes: 41 additions & 1 deletion test/test.js
Expand Up @@ -436,7 +436,7 @@ describe('async-writer' , function() {

var asyncOut = out.beginAsync({last: true});
var lastFiredCount = 0;

out.on('last', function() {
lastFiredCount++;
});
Expand Down Expand Up @@ -771,4 +771,44 @@ describe('async-writer' , function() {
out.end();
}, 10);
});

it('should support out.stream for accessing the original stream', function(done) {

var through = require('through');
var outStr = '';

var stream = through(
function write(str) {
outStr += str;
}
);

var out = asyncWriter.create(stream);
expect(out.stream).to.equal(stream);

var asyncOut1 = out.beginAsync();
expect(asyncOut1.stream).to.equal(stream);
setTimeout(function() {
expect(asyncOut1.stream).to.equal(stream);
asyncOut1.end();
}, 100);


var asyncOut2 = out.beginAsync();
expect(asyncOut2.stream).to.equal(stream);
setTimeout(function() {
expect(asyncOut2.stream).to.equal(stream);
asyncOut2.end();
}, 50);

out.on('end', function() {
expect(out.stream).to.equal(stream);
expect(asyncOut1.stream).to.equal(stream);
expect(asyncOut2.stream).to.equal(stream);
done();
});

out.end();

});
});

0 comments on commit b60f33c

Please sign in to comment.