Skip to content
This repository has been archived by the owner on May 31, 2020. It is now read-only.

#91 - Modular structure of celery client #92

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 9 additions & 218 deletions celery.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
var url = require('url'),
util = require('util'),
amqp = require('amqp'),
redis = require('redis'),
events = require('events'),
uuid = require('uuid');
var url = require('url'),
util = require('util'),
amqp = require('amqp'),
events = require('events');

var Result = require('./lib/Result');
var Task = require('./lib/Task');
var RedisBroker = require('./lib/redis/RedisBroker');
var RedisBackend = require('./lib/redis/RedisBackend');

var createMessage = require('./protocol').createMessage;

Expand Down Expand Up @@ -66,112 +69,6 @@ function Configuration(options) {
self.ROUTES = self.ROUTES || {};
}

function RedisBroker(conf) {
var self = this;
self.redis = redis.createClient(conf.BROKER_OPTIONS);

self.end = function() {
self.redis.end(true);
};

self.disconnect = function() {
self.redis.quit();
};

self.redis.on('connect', function() {
self.emit('ready');
});

self.redis.on('error', function(err) {
self.emit('error', err);
});

self.redis.on('end', function() {
self.emit('end');
});

self.publish = function(queue, message, options, callback, id) {
var payload = {
body: new Buffer(message).toString('base64'),
headers: {},
'content-type': options.contentType,
'content-encoding': options.contentEncoding,
properties: {
body_encoding: 'base64',
correlation_id: id,
delivery_info: {
exchange: queue,
priority: 0,
routing_key: queue
},
delivery_mode: 2, // No idea what this means
delivery_tag: uuid.v4(),
reply_to: uuid.v4()
}
};
self.redis.lpush(queue, JSON.stringify(payload));
};

return self;
}
util.inherits(RedisBroker, events.EventEmitter);

function RedisBackend(conf) {
var self = this;
self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS);

var backend_ex = self.redis.duplicate();

self.redis.on('error', function(err) {
self.emit('error', err);
});

self.redis.on('end', function() {
self.emit('end');
});

self.disconnect = function() {
backend_ex.quit();
self.redis.quit();
};

// store results to emit event when ready
self.results = {};

// results prefix
var key_prefix = 'celery-task-meta-';

self.redis.on('connect', function() {
debug('Backend connected...');
// on redis result..
self.redis.on('pmessage', function(pattern, channel, data) {
backend_ex.expire(channel, conf.TASK_RESULT_EXPIRES / 1000);
var message = JSON.parse(data);
var taskid = channel.slice(key_prefix.length);
if (self.results.hasOwnProperty(taskid)) {
var res = self.results[taskid];
res.result = message;
res.emit('ready', res.result);
delete self.results[taskid];
} else {
// in case of incoming messages where we don't have the result object
self.emit('message', message);
}
});
// subscribe to redis results
self.redis.psubscribe(key_prefix + '*', () => {
self.emit('ready');
});
});

self.get = function(taskid, cb) {
backend_ex.get(key_prefix + taskid, cb);
}

return self;
}
util.inherits(RedisBackend, events.EventEmitter);

function Client(conf) {
var self = this;
self.ready = false;
Expand Down Expand Up @@ -260,112 +157,6 @@ Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]*
return result;
};

function Task(client, name, options, exchange) {
var self = this;

self.client = client;
self.name = name;
self.options = options || {};

var route = self.client.conf.ROUTES[name],
queue = route && route.queue;

self.publish = function (args, kwargs, options, callback) {
var id = options.id || uuid.v4();

var result = new Result(id, self.client);

if (client.conf.backend_type === 'redis') {
client.backend.results[result.taskid] = result;
}

queue = options.queue || self.options.queue || queue || self.client.conf.DEFAULT_QUEUE;
var msg = createMessage(self.name, args, kwargs, options, id);
var pubOptions = {
'contentType': 'application/json',
'contentEncoding': 'utf-8'
};

if (exchange) {
exchange.publish(queue, msg, pubOptions, callback);
} else {
self.client.broker.publish(queue, msg, pubOptions, callback);
}

return result;
};
}

Task.prototype.call = function(args, kwargs, options, callback) {
var self = this;

args = args || [];
kwargs = kwargs || {};
options = options || self.options || {};

if (!self.client.ready) {
self.client.emit('error', 'Client is not ready');
}
else {
return self.publish(args, kwargs, options, callback);
}
};

function Result(taskid, client) {
var self = this;

events.EventEmitter.call(self);
self.taskid = taskid;
self.client = client;
self.result = null;

if (self.client.conf.backend_type === 'amqp' && !self.client.conf.IGNORE_RESULT) {
debug('Subscribing to result queue...');
self.client.backend.queue(
self.taskid.replace(/-/g, ''), {
"arguments": {
'x-expires': self.client.conf.TASK_RESULT_EXPIRES
},
'durable': self.client.conf.TASK_RESULT_DURABLE,
'closeChannelOnUnsubscribe': true
},

function (q) {
q.bind(self.client.conf.RESULT_EXCHANGE, '#');
var ctag;
q.subscribe(function (message) {
if (message.contentType === 'application/x-python-serialize') {
console.error('Celery should be configured with json serializer');
process.exit(1);
}
self.result = message;
q.unsubscribe(ctag);
debug('Emiting ready event...');
self.emit('ready', message);
debug('Emiting task status event...');
self.emit(message.status.toLowerCase(), message);
}).addCallback(function(ok) { ctag = ok.consumerTag; });
});
}
}

util.inherits(Result, events.EventEmitter);

Result.prototype.get = function(callback) {
var self = this;
if (callback && self.result === null) {
self.client.backend.get(self.taskid, function(err, reply) {
self.result = JSON.parse(reply);
callback(self.result);
});
} else {
if (callback) {
callback(self.result);
}
return self.result;
}
};

exports.createClient = function(config, callback) {
return new Client(config, callback);
};
Expand Down
61 changes: 61 additions & 0 deletions lib/Result.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
var events = require('events');
var util = require('util');
var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {};


function Result(taskid, client) {
var self = this;

events.EventEmitter.call(self);
self.taskid = taskid;
self.client = client;
self.result = null;

if (self.client.conf.backend_type === 'amqp' && !self.client.conf.IGNORE_RESULT) {
debug('Subscribing to result queue...');
self.client.backend.queue(
self.taskid.replace(/-/g, ''), {
"arguments": {
'x-expires': self.client.conf.TASK_RESULT_EXPIRES
},
'durable': self.client.conf.TASK_RESULT_DURABLE,
'closeChannelOnUnsubscribe': true
},

function (q) {
q.bind(self.client.conf.RESULT_EXCHANGE, '#');
var ctag;
q.subscribe(function (message) {
if (message.contentType === 'application/x-python-serialize') {
console.error('Celery should be configured with json serializer');
process.exit(1);
}
self.result = message;
q.unsubscribe(ctag);
debug('Emiting ready event...');
self.emit('ready', message);
debug('Emiting task status event...');
self.emit(message.status.toLowerCase(), message);
}).addCallback(function(ok) { ctag = ok.consumerTag; });
});
}
}

util.inherits(Result, events.EventEmitter);

Result.prototype.get = function(callback) {
var self = this;
if (callback && self.result === null) {
self.client.backend.get(self.taskid, function(err, reply) {
self.result = JSON.parse(reply);
callback(self.result);
});
} else {
if (callback) {
callback(self.result);
}
return self.result;
}
};

module.exports = Result;
56 changes: 56 additions & 0 deletions lib/Task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
var Result = require('./Result');
var uuid = require('uuid');
var createMessage = require('../protocol').createMessage;

function Task(client, name, options, exchange) {
var self = this;

self.client = client;
self.name = name;
self.options = options || {};

var route = self.client.conf.ROUTES[name],
queue = route && route.queue;

self.publish = function (args, kwargs, options, callback) {
var id = options.id || uuid.v4();

var result = new Result(id, self.client);

if (client.conf.backend_type === 'redis') {
client.backend.results[result.taskid] = result;
}

queue = options.queue || self.options.queue || queue || self.client.conf.DEFAULT_QUEUE;
var msg = createMessage(self.name, args, kwargs, options, id);
var pubOptions = {
'contentType': 'application/json',
'contentEncoding': 'utf-8'
};

if (exchange) {
exchange.publish(queue, msg, pubOptions, callback);
} else {
self.client.broker.publish(queue, msg, pubOptions, callback);
}

return result;
};
}

Task.prototype.call = function(args, kwargs, options, callback) {
var self = this;

args = args || [];
kwargs = kwargs || {};
options = options || self.options || {};

if (!self.client.ready) {
self.client.emit('error', 'Client is not ready');
}
else {
return self.publish(args, kwargs, options, callback);
}
};

module.exports = Task;
Loading