Skip to content

Commit

Permalink
Fixes early end at high water mark.
Browse files Browse the repository at this point in the history
  • Loading branch information
jxson committed Feb 3, 2016
1 parent c9bdf61 commit 77ed55f
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,20 @@ function Powerwalk(options) {

prr(powerwalk, 'options', options)
prr(powerwalk, 'depth', 0, { writable: true })
prr(powerwalk, '_q', [])
prr(powerwalk, '_walked', [])
prr(powerwalk, '_queue', {})

powerwalk.on('drain', function() {
debug('drain');

// Temporary workaround for races with the current queue.
var length = Object.keys(powerwalk._queue).length;
if (length === 0 && !powerwalk.isPaused()) {
debug('empty queue');
powerwalk.end();
}

});

powerwalk.on('path', push(powerwalk._walked))
}
Expand Down Expand Up @@ -111,7 +123,7 @@ Powerwalk.prototype._transform = function (buffer, enc, callback) {
// }
// // end first write setup

powerwalk.queue(pathname)
powerwalk.enqueue(pathname)

fs.lstat(pathname, function(err, stats) {
if (err) {
Expand Down Expand Up @@ -199,56 +211,43 @@ Powerwalk.prototype.walked = function(pathname) {
return contains(this._walked, pathname)
}

Powerwalk.prototype.queue = function(pathname) {
this._q.push(pathname)
Powerwalk.prototype.enqueue = function(pathname) {
this._queue[pathname] = pathname;
}

Powerwalk.prototype.dequeue = function(pathname, type, callback) {
var powerwalk = this
var start = powerwalk._q.indexOf(pathname)
var deleteCount = 1
var removed = powerwalk._q.splice(start, deleteCount)[0]
debug('dequeue: %s', pathname);

if (! removed) {
var err = new Error('Can not dequeue items that have not been queued.')
powerwalk.emit('error', err)
return
}
var powerwalk = this

if (! powerwalk.walked(pathname)) {
powerwalk.emit('path', pathname)
powerwalk.emit(type, pathname)
}

var pushed = true;
if (type === powerwalk.options.emit) {
debug('%s: %s', type, pathname)
callback(null, pathname)
} else {
callback()
pushed = powerwalk.push(pathname)
}

if (powerwalk._q.length === 0) {
powerwalk.end()
}
debug('paused: %s', powerwalk.isPaused());
debug('pushed: %s', pushed);

delete powerwalk._queue[pathname]

debug('===========');

return removed
callback();
}

Powerwalk.prototype._flush = function(callback) {
debug('_flush')

var powerwalk = this

// Experimental: This might be a bad idea since data events are queued and the
// read stream might not be hooked up til later.
// if (powerwalk.listeners('data').length === 0) {
// powerwalk.on('data', noop)
// }

callback()
}



function contains(array, item) {
return array.indexOf(item) !== -1
}
Expand Down

0 comments on commit 77ed55f

Please sign in to comment.