Skip to content

Commit

Permalink
does not depend on kue anymore. failover mechanism (has to be improved)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgte committed Oct 18, 2011
1 parent bd79b4d commit ce43709
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 21 deletions.
161 changes: 145 additions & 16 deletions index.js
@@ -1,31 +1,160 @@
var kue = require('kue')
, redis = require('redis');
var redis = require('redis')
, EventEmitter = require('events').EventEmitter;

module.exports = function(config) {
var jobs;
var connections = []
, active = 0
, ending = false
, that;

config = config || {};

kue.redis.createClient = function() {

config.popTimeout || (config.popTimeout = 5);

function createClient() {
var client = redis.createClient(config.port || 6379, config.host || '127.0.0.1');
if (config.password) { client.auth(config.password); }
return client;
};

jobs = kue.createQueue();
}

function inactiveQueueName(type) {
return 'q:jobs:' + type + ':inactive';
}

function activeQueueName(type) {
return 'q:jobs:' + type + ':active';
}

function push(type, job, done) {
jobs.create(type, job).save(done);
var client = createClient();

if (! done) {
done = function(err) {
if (err) {
console.error(err); return that.emit('error', err);
}
}
}

function callback() {
if (client && client.connected) {
client.end();
}
done.apply(module, arguments);
}


function error(err) {
if (err) {
callback(err);
}
}


client.incr('jobs:next_id', function(err, nextId) {
if (err) { return error(err); }

var jobId = 'jobs:' + nextId;

client.set(jobId, JSON.stringify(job), function(err) {
if (err) { return error(err); }
client.lpush(inactiveQueueName(type), nextId, function(err) {
if (err) { return error(err); }
client.end();
callback();
});
});
});

};

function pop(type, callback) {
jobs.process(type, function(job, done) {
callback(job.data, done);
function connect(type, callback) {
var connection = connections[type];
try {
if (! connection || ! connection.connected) {
connections[type] = connection = createClient();
}
} catch(err) {
return callback(err);
}
callback(null, connection);
}

function disconnect(type) {
var connection = connections[type];
if (connection && connection.connected) {
connection.end();
}
}

function listen(type, callback) {
connect(type, function(err, client) {
if (err) { return that.emit('error', err); }
var activeName = activeQueueName(type) ;
client.brpoplpush(inactiveQueueName(type), activeName, config.popTimeout, function(err, jobId) {

if (err) { return that.emit('error', err); }

if (! jobId) { return callback(null, null); }

client.get('jobs:' + jobId, function(err, jobStr) {
if (err) { return that.emit(err); }

callback(JSON.parse(jobStr), function(cb) {
client.lrem(activeName, 1, jobId, function(err) {
if (err) { return that.emit(err); }
cb();
});
});
});
});
});
}

return {
push: push
, pop: pop


function end(cb) {
ending = true;
if (cb) {
that.on('done', function doneCB() {
if (active === 0) {
that.removeListener('done', doneCB);
cb();
}
});
}
}

function pop(type, callback) {
if (! ending) {
active += 1;
listen(type, function(job, done) {
if (job) {
callback(job, function() {
done(function() {
active -= 1;
that.emit('done');
pop(type, callback);
});
});
} else {
active -= 1;
pop(type, callback);
}
});
} else {
that.emit('done');
disconnect(type);
}
}

that = {
end: end
, push: push
, pop: pop
};

that.__proto__ = EventEmitter.prototype;

return that;
};
5 changes: 2 additions & 3 deletions package.json
@@ -1,7 +1,7 @@
{ "name" : "banzai-redis"
, "description" : "Redis queueing plugin module for Banzai"
, "tags" : ["ETL", "pipeline", "document", "state machine", "redis", "queue", "banzai"]
, "version" : "0.1.1"
, "version" : "0.2.0"
, "author" : "Pedro Teixeira <pedro.teixeira@gmail.com>"
, "repository" :
{ "type" : "git"
Expand All @@ -12,8 +12,7 @@
, "engines" : ["node >= 0.4.10"]
, "main" : "./index"
, "dependencies": {
"kue": "0.3.x"
, "redis": "0.6.0"
"redis": "0.6.x"
}
, "scripts": { "test": "expresso test.js" }
}
9 changes: 7 additions & 2 deletions test.js
@@ -1,8 +1,13 @@
var queue = require('./')()
var queue = require('./')({popTimeout:1})
, assert = require('assert');

queue.on('error', function(err) {
throw err;
});

setTimeout(function() {
process.exit();
queue.end(function() {
});
}, 5000)

exports.push_then_pop = function(done) {
Expand Down

0 comments on commit ce43709

Please sign in to comment.