Skip to content

Commit

Permalink
standardize
Browse files Browse the repository at this point in the history
  • Loading branch information
jessetane committed Feb 2, 2017
1 parent b902446 commit 1627245
Show file tree
Hide file tree
Showing 15 changed files with 635 additions and 633 deletions.
87 changes: 44 additions & 43 deletions example/index.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,66 @@
var queue = require('../');
var queue = require('../')

var q = queue();
var results = [];
var q = queue()
var results = []

// add jobs using the familiar Array API

q.push(function(cb) {
results.push('two');
cb();
});
q.push(function (cb) {
results.push('two')
cb()
})

q.push(
function(cb) {
results.push('four');
cb();
function (cb) {
results.push('four')
cb()
},
function(cb) {
results.push('five');
cb();
function (cb) {
results.push('five')
cb()
}
);
)

q.unshift(function(cb) {
results.push('one');
cb();
});
q.unshift(function (cb) {
results.push('one')
cb()
})

q.splice(2, 0, function(cb) {
results.push('three');
cb();
});
q.splice(2, 0, function (cb) {
results.push('three')
cb()
})

// use the timeout feature to deal with jobs that
// use the timeout feature to deal with jobs that
// take too long or forget to execute a callback

q.timeout = 100;
q.timeout = 100

q.on('timeout', function(next, job) {
console.log('job timed out:', job.toString().replace(/\n/g, ''));
next();
});
q.on('timeout', function (next, job) {
console.log('job timed out:', job.toString().replace(/\n/g, ''))
next()
})

q.push(function(cb) {
setTimeout(function() {
console.log('slow job finished');
cb();
}, 200);
});
q.push(function (cb) {
setTimeout(function () {
console.log('slow job finished')
cb()
}, 200)
})

q.push(function(cb) {
console.log('forgot to execute callback');
});
q.push(function (cb) {
console.log('forgot to execute callback')
})

// get notified when jobs complete

q.on('success', function(result, job) {
console.log('job finished processing:', job.toString().replace(/\n/g, ''));
});
q.on('success', function (result, job) {
console.log('job finished processing:', job.toString().replace(/\n/g, ''))
})

// begin processing, get notified on end / failure

q.start(function(err) {
console.log('all done:', results);
});
q.start(function (err) {
if (err) throw err
console.log('all done:', results)
})
193 changes: 97 additions & 96 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,172 +1,173 @@
var inherits = require('inherits');
var EventEmitter = require('events').EventEmitter;

module.exports = Queue;

function Queue(options) {
if (!(this instanceof Queue))
return new Queue(options);

EventEmitter.call(this);
options = options || {};
this.concurrency = options.concurrency || Infinity;
this.timeout = options.timeout || 0;
this.autostart = options.autostart || false;
this.pending = 0;
this.session = 0;
this.running = false;
this.jobs = [];
this.timers = {};
var inherits = require('inherits')
var EventEmitter = require('events').EventEmitter

module.exports = Queue

function Queue (options) {
if (!(this instanceof Queue)) {
return new Queue(options)
}

EventEmitter.call(this)
options = options || {}
this.concurrency = options.concurrency || Infinity
this.timeout = options.timeout || 0
this.autostart = options.autostart || false
this.pending = 0
this.session = 0
this.running = false
this.jobs = []
this.timers = {}
}
inherits(Queue, EventEmitter);
inherits(Queue, EventEmitter)

var arrayMethods = [
'pop',
'shift',
'indexOf',
'lastIndexOf'
];
]

arrayMethods.forEach(function(method) {
Queue.prototype[method] = function() {
return Array.prototype[method].apply(this.jobs, arguments);
};
});
arrayMethods.forEach(function (method) {
Queue.prototype[method] = function () {
return Array.prototype[method].apply(this.jobs, arguments)
}
})

Queue.prototype.slice = function(begin, end) {
this.jobs = this.jobs.slice(begin, end);
return this;
};
Queue.prototype.slice = function (begin, end) {
this.jobs = this.jobs.slice(begin, end)
return this
}

Queue.prototype.reverse = function() {
this.jobs.reverse();
return this;
};
Queue.prototype.reverse = function () {
this.jobs.reverse()
return this
}

var arrayAddMethods = [
'push',
'unshift',
'splice'
];
]

arrayAddMethods.forEach(function(method) {
Queue.prototype[method] = function() {
var methodResult = Array.prototype[method].apply(this.jobs, arguments);
arrayAddMethods.forEach(function (method) {
Queue.prototype[method] = function () {
var methodResult = Array.prototype[method].apply(this.jobs, arguments)
if (this.autostart) {
this.start();
this.start()
}
return methodResult;
return methodResult
}
})

Object.defineProperty(Queue.prototype, 'length', { get: function() {
return this.pending + this.jobs.length;
}});
Object.defineProperty(Queue.prototype, 'length', { get: function () {
return this.pending + this.jobs.length
}})

Queue.prototype.start = function(cb) {
Queue.prototype.start = function (cb) {
if (cb) {
callOnErrorOrEnd.call(this, cb);
callOnErrorOrEnd.call(this, cb)
}

this.running = true;
this.running = true

if (this.pending === this.concurrency) {
return;
return
}

if (this.jobs.length === 0) {
if (this.pending === 0) {
done.call(this);
done.call(this)
}
return;
return
}

var self = this;
var job = this.jobs.shift();
var once = true;
var session = this.session;
var timeoutId = null;
var didTimeout = false;
var self = this
var job = this.jobs.shift()
var once = true
var session = this.session
var timeoutId = null
var didTimeout = false

function next(err, result) {
function next (err, result) {
if (once && self.session === session) {
once = false;
self.pending--;
once = false
self.pending--
if (timeoutId !== null) {
delete self.timers[timeoutId]
clearTimeout(timeoutId);
clearTimeout(timeoutId)
}

if (err) {
self.emit('error', err, job);
self.emit('error', err, job)
} else if (didTimeout === false) {
self.emit('success', result, job);
self.emit('success', result, job)
}

if (self.session === session) {
if (self.pending === 0 && self.jobs.length === 0) {
done.call(self);
done.call(self)
} else if (self.running) {
self.start();
self.start()
}
}
}
}

if (this.timeout) {
timeoutId = setTimeout(function() {
didTimeout = true;
timeoutId = setTimeout(function () {
didTimeout = true
if (self.listeners('timeout').length > 0) {
self.emit('timeout', next, job);
self.emit('timeout', next, job)
} else {
next();
next()
}
}, this.timeout);
}, this.timeout)
this.timers[timeoutId] = timeoutId
}

this.pending++;
job(next);
this.pending++
job(next)

if (this.jobs.length > 0) {
this.start();
this.start()
}
};
}

Queue.prototype.stop = function() {
this.running = false;
};
Queue.prototype.stop = function () {
this.running = false
}

Queue.prototype.end = function(err) {
clearTimers.call(this);
this.jobs.length = 0;
this.pending = 0;
done.call(this, err);
};
Queue.prototype.end = function (err) {
clearTimers.call(this)
this.jobs.length = 0
this.pending = 0
done.call(this, err)
}

function clearTimers () {
for (var key in this.timers) {
var timeoutId = this.timers[key]
delete this.timers[key]
clearTimeout(timeoutId);
clearTimeout(timeoutId)
}
}

function callOnErrorOrEnd(cb) {
var self = this;
this.on('error', onerror);
this.on('end', onend);
function callOnErrorOrEnd (cb) {
var self = this
this.on('error', onerror)
this.on('end', onend)

function onerror(err) { self.end(err); }
function onend(err) {
self.removeListener('error', onerror);
self.removeListener('end', onend);
cb(err);
function onerror (err) { self.end(err) }
function onend (err) {
self.removeListener('error', onerror)
self.removeListener('end', onend)
cb(err)
}
}

function done(err) {
this.session++;
this.running = false;
this.emit('end', err);
function done (err) {
this.session++
this.running = false
this.emit('end', err)
}

0 comments on commit 1627245

Please sign in to comment.