-
Notifications
You must be signed in to change notification settings - Fork 0
/
celery.js
81 lines (71 loc) · 2.34 KB
/
celery.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
var fs = require('fs'),
ini = require('ini'),
url = require('url'),
amqp = require('amqp'),
http = require('http'),
io = require('socket.io'),
msgpack = require('msgpack'),
sys = require(process.binding('natives').util ? 'util' : 'sys');
var conf, conn, sock;
function brokerReady() {
var defaultExchange = conn.exchange(conf.celery.exchange, {
type: conf.celery.type,
durable: true,
autoDelete: false
});
sock.sockets.on('connection', function(client) {
client.on(conf.celery.taskevent, function(message) {
resultQueue(conn, message, function(result) {
client.emit(conf.celery.resultevent, result);
});
var ex = defaultExchange;
var route = conf.celery.route;
if (message.exchange) {
ex = conn.exchange(message.exchange, {
type: conf.celery.type,
durable: true,
autoDelete: false
});
route = message.exchange;
}
ex.publish(route, pack(message), {
contentType: 'application/x-msgpack',
contentEncoding: 'binary'
});
});
client.on('disconnect', function() {
// TODO flush callbacks
});
});
}
function resultQueue(conn, task, callback) {
var queueName = task.id.replace(/-/g, '');
var args = {};
if ('resultexpires' in conf.celery) {
args['x-expires'] = parseInt(conf.celery.resultexpires);
}
conn.queue(queueName, {'arguments': args}, function(queue) {
queue.subscribe(function(message, headers, deliveryInfo) {
callback(msgpack.unpack(new Buffer(message.data)));
queue.destroy();
});
});
}
function pack(message) {
// workaround for amqp's buffer type check
var sbuf = msgpack.pack(message);
var buf = new Buffer(sbuf.length);
sbuf.copy(buf);
return buf;
}
function main(argv) {
conf = ini.decode(fs.readFileSync(argv.config).toString());
var server = http.createServer(function(req, res) {
res.writeHead(200);
});
server.listen(parseInt(conf.socketio.listen));
sock = io.listen(server);
conn = amqp.createConnection(conf.amqp);
conn.addListener('ready', brokerReady);
}
exports.main = main;