Skip to content

Commit

Permalink
implementing stream.destroy
Browse files Browse the repository at this point in the history
  • Loading branch information
thlorenz committed Jan 19, 2013
1 parent 2a5c46b commit c51ae8a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
13 changes: 13 additions & 0 deletions stream-api.js
Expand Up @@ -9,6 +9,7 @@ function createStreamAPI () {
, paused = true
, controlled = false
, buffer = []
, closed = false
;

stream = new Stream();
Expand All @@ -31,29 +32,41 @@ function createStreamAPI () {
}
};

stream.destroy = function () {
closed = true;
stream.emit('close');
};

// called for each entry
processEntry = function (entry) {
if (closed) return;
return paused ? buffer.push({ type: 'data', data: entry }) : stream.emit('data', entry);
};

// called with all found entries when directory walk finished
done = function (err, entries) {
if (closed) return;

// since we already emitted each entry and all non fatal errors
// all we need to do here is to signal that we are done
stream.emit('end');
};

handleError = function (err) {
if (closed) return;
return paused ? buffer.push({ type: 'warn', data: err }) : stream.emit('warn', err);
};

handleFatalError = function (err) {
if (closed) return;
return paused ? buffer.push({ type: 'error', data: err }) : stream.emit('error', err);
};

// Allow stream to be returned and handlers to be attached and/or stream to be piped before emitting messages
// Otherwise we may loose data/errors that are emitted immediately
process.nextTick(function () {
if (closed) return;

// In case was controlled (paused/resumed) manually, we don't interfer
// see https://github.com/thlorenz/readdirp/commit/ab7ff8561d73fca82c2ce7eb4ce9f7f5caf48b55#commitcomment-1964530
if (controlled) return;
Expand Down
41 changes: 41 additions & 0 deletions test/readdirp-stream.js
Expand Up @@ -169,4 +169,45 @@ test('\napi separately', function (t) {
api.stream.resume();
})
})

t.test('\n# when a stream is destroyed, it emits "closed", but no longer emits "data", "warn" and "error"', function (t) {
t.plan(5)
var api = streamapi()
, destroyed = false
, fatalError = new Error('fatal!')
, nonfatalError = new Error('nonfatal!')
, processedData = 'some data'

api.stream
.on('warn', function (err) {
t.notOk(destroyed, 'emits warning until destroyed');
})
.on('error', function (err) {
t.notOk(destroyed, 'emits errors until destroyed');
})
.on('data', function (data) {
t.notOk(destroyed, 'emits data until destroyed');
})
.on('close', function () {
t.ok(destroyed, 'emits close when stream is destroyed');
})


api.processEntry(processedData);
api.handleError(nonfatalError);
api.handleFatalError(fatalError);

process.nextTick(function () {
destroyed = true
api.stream.destroy()

api.processEntry(processedData);
api.handleError(nonfatalError);
api.handleFatalError(fatalError);

process.nextTick(function () {
t.pass('emits no more data, warn or error events after it was destroyed')
})
})
})
})

0 comments on commit c51ae8a

Please sign in to comment.