Skip to content

Commit

Permalink
Merge pull request #4 from jcrugzz/simplify-deferreds
Browse files Browse the repository at this point in the history
Simplify deferreds
  • Loading branch information
jcrugzz committed Jun 8, 2015
2 parents dbb6dee + e9db410 commit 002315e
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 141 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ build/Release
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git-
node_modules

#IDEs
.idea
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Rabbit.prototype._onChannel = function (err, ch) {
};

Rabbit.prototype.socket = function (type, options) {
var Socket = sockets[type];
var Socket = sockets[type.toLowerCase()];
if (!Socket) {
var error = new Error('Invalid socket type');
return process.nextTick(this.emit.bind(this, 'error', error));
Expand Down
243 changes: 106 additions & 137 deletions socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,10 @@ var util = require('util');
var debug = require('diagnostics')('rabbit-rr:socket');
var uuid = require('uuid');

var noop = function () {};

var TYPES = ['req', 'rep', 'REQ', 'REP'];

var SOCKETS = TYPES.reduce(function (acc, type) {
acc[type] = type.toLowerCase() === 'req'
? ReqSocket
: RepSocket;

return acc;
}, {});

module.exports = SOCKETS;
module.exports = {
req: ReqSocket,
rep: RepSocket
};

util.inherits(Socket, EE);

Expand All @@ -25,61 +16,30 @@ function Socket (rabbit, options) {
this.rabbit = rabbit;
this.options = options || {};

// Only try and defer connect because send makes things way complicated right
// now
this.channel = undefined;
// Operations called potentially before a channel exists
this.operations = this.methods.reduce(function(acc, method) {
acc[method] = [];
return acc;
}, {});
//
// So we can probably still grab the methods off the prototype even af
//

this.ready = false;
this._deferredConnections = [];

//
// Remark: we always defer send if we are a REQ socket
// because other async things ALWAYS need to happen when a
// REQ socket is constructed.
//
if (~this.methods.indexOf('send')) this._deferMethod('send');
if(!this.rabbit.ready) {
this._deferMethod('connect');
this.rabbit.once('ready', this._setup.bind(this));
} else {
this._setChannel(this.rabbit.channel);
this._setup(this.rabbit.channel);
}
}

Socket.prototype._deferMethod = function (method) {
var self = this;
this[method] = function () {
self._operation(method, arguments);
return self;
}
};

Socket.prototype._operation = function (method, args) {
if (this.channel && this.ready && method === 'connect') {
return this[method].apply(this, args);
}
this.operations[method].push({ method: method, args: args });
};

Socket.prototype._setup = function (channel) {
this._setChannel(channel);
this._runDeferred('connect');
this.emit('ready');
this.ready = true;
};
this.emit('ready');
if (!this._deferredConnections.length) return;

Socket.prototype._runDeferred = function (method) {
delete this[method];
var op;
while((op = this.operations[method].shift())) {
debug('running deferred operation %s with args %j', op.method, op.args);
this[op.method].apply(this, op.args)
//
// Connect to call queues that were deferred
//
for (var i = 0; i < this._deferredConnections.length; i++){
var queueName = this._deferredConnections[i].queueName;
return void this.connect(queueName);
}
};

Expand All @@ -91,14 +51,9 @@ Socket.prototype._setChannel = function (channel) {
this.channel.prefetch(this.options.prefetch);
}

this.channel.on('error', this.emit.bind(this, 'error'));
//
// TODO: Do we have to cleanup anything?
//
this.channel.on('error', this.emit.bind(this, 'error')); // TODO: Do we have to cleanup anything?

this.channel.on('close', this.emit.bind(this, 'close'));
//
// These aren't strictly necessary but could be interesting to look at maybe?
//
this.channel.on('drain', this.emit.bind(this, 'drain'));
this.channel.on('readable', this.emit.bind(this, 'readable'));
};
Expand Down Expand Up @@ -137,18 +92,23 @@ function RepSocket() {
this.consumers = {};
}

RepSocket.prototype.connect = function (source, callback) {
var self = this;
RepSocket.prototype.connect = function (source) {
if (!this.ready) {
this._deferredConnections.push({ queueName: source });
return this;
}

if (this.consumers[source]) {
return process.nextTick(callback || noop);
process.nextTick(callback.bind(this, source));
return this;
}

var self = this;
this.channel.assertQueue(source, { durable: this.options.persistent }, function (err, ok) {
if (err) return callback ? callback(err) : self.emit('error', err);
if (err) return void self.emit('error', err);
self.channel.consume(source, self._consume.bind(self), { noAck: false });
self.consumers[source] = ok.consumerTag;
self.emit('connect', source);
callback && callback();
});

return this;
Expand All @@ -166,33 +126,25 @@ RepSocket.prototype._consume = function (msg) {
this.emit('message', payload, reply);

function reply(err, data) {
debug('rep socket reply being executed %j', arguments);
debug('rep socket reply being executed %j', msg);
if (err) {
//
// Remark: This is something weird TBH as it shouldn't happen
// But we should keep with callback convention and send errors back in some way
// jsut in case.
//
self.emit('application error', err);
data = { error: true, message: err.message };
}

var replyTo = msg.properties.replyTo;
//
// Remark: Replies are never persistent because the queue is ephemeral and dies
// with the socket. Note, maybe we can make a different topology than this
// but this seems reasonable and is simple
//

// Remark: Replies are never persistent because the queue is ephemeral and dies with the socket.
var options = {
deliveryMode: true,
expiration: self.options.expiration,
correlationId: id
};

var res = self.channel.sendToQueue(replyTo, self.pack(data), options);
//
// Remark: We ack the message after its processed to let rabbit know whats up
//

// Ack the message after its processed to let rabbit know whats up
// Note that doing this step last means messages which kill the receiver will be trapped in the queue
self.channel.ack(msg);

return res;
Expand All @@ -207,71 +159,89 @@ function ReqSocket () {
this.methods = ['connect', 'send'];
Socket.apply(this, arguments);

this.replyTo = undefined;
this.rx = 0;
this.queues = [];
this.callbacks = {};
this._replyTo = undefined;
this._rx = 0;
this._queues = [];
this._callbacks = {};

this._deferredSends = [];
this._isConnected = false;
this._responseQueueIsConnected = false;

if (this.ready && this.channel)
this._setupConsumer();
this._setupResponseQueue();
else
this.once('ready', this._setupConsumer.bind(this));
this.once('ready', this._setupResponseQueue.bind(this));
}

ReqSocket.prototype._setupConsumer = function () {
ReqSocket.prototype._canSend = function () {
return this._isConnected && this._responseQueueIsConnected;
};

ReqSocket.prototype._trySendDeferredMessages = function () {
if (!this._canSend()) return;

for (var i = 0; i < this._deferredSends.length; i++) {
var message = this._deferredSends[i];
this._sendNow(message.message, message.id);
}

this._deferredSends = [];
};

ReqSocket.prototype._setupResponseQueue = function () {
var self = this;
//
// Remark: We create an ephemeral reply queue here to receive messages on
//
this.channel.assertQueue('', {exclusive: true, autoDelete: true }, function (err, ok) {
if (err) { return self.emit('error', err); }
self.replyTo = ok.queue;
self._canMaybeSend();
// Messages sent by the responder will be delivered on this queue
this.channel.assertQueue('', { exclusive: true, autoDelete: true }, function (err, ok) {
if (err) { return void self.emit('error', err); }
self._replyTo = ok.queue;
self.channel.consume(ok.queue, self._consume.bind(self), { noAck: false, exclusive: true });
self._responseQueueIsConnected = true;
self._trySendDeferredMessages();
});
};

ReqSocket.prototype._canMaybeSend = function () {
debug('canMaybeSend called');
if (!this.replyTo || !this.queues.length) return;
this._runDeferred('send');
};

ReqSocket.prototype._consume = function (msg) {
if (msg === null) return debug('Req socket msg received is null');
debug('Req socket received reply over ephemeral queue %s %j', this.replyTo, msg);
this.reply(msg);
debug('Req socket received reply over ephemeral queue %s %j', this._replyTo, msg);
this._handleReceipt(msg);
this.channel.ack(msg);
};

ReqSocket.prototype.reply = function (msg) {
ReqSocket.prototype._handleReceipt = function (msg) {
var id = msg.properties.correlationId;
var fn = this.callbacks[id];
var fn = this._callbacks[id];

// This means that we can fire and forget messages. Is this an intentional feature, or should
// this indicate that something went wrong?
if (!fn) return debug('missing callback for %s', id);

var message = this.parse(msg.content);
//
// Remark: since we can't really error should we just respond with the message?
//

delete this._callbacks[id];

if (message.error && message.message) {
var error = new Error(message.message);
return fn(error);
}
fn(null, message);
delete this.callbacks[id];
};

ReqSocket.prototype.connect = function (destination, callback) {
ReqSocket.prototype.connect = function (destination) {
if (!this.ready) {
this._deferredConnections.push({ queueName: destination });
return this;
}
var self = this;

this.channel.assertQueue(destination,
{ durable: this.options.persistent }, function (err, ok) {
if (err) return callback ? callback(err) : self.emit('error', err);
self.queues.push(ok.queue);
self._canMaybeSend();
self.emit('connect');
callback && callback();
});
if (err) return void self.emit('error', err);
self._queues.push(ok.queue);
self._isConnected = true;
self._trySendDeferredMessages();
self.emit('connect');
});

return this;
};
Expand All @@ -280,39 +250,38 @@ ReqSocket.prototype.id = function () {
return uuid();
};

//
// Remark: Im assuming object serialization here but in the future this could
// be more flexible
//
ReqSocket.prototype.send = function (message, callback) {
//
// Remark: Simple round-robin without array mutation
// if we have more than one queue to send to
//
if (this.rx >= this.queues.length) this.rx = 0;
var queue = this.queues[this.rx++];
ReqSocket.prototype._roundRobin = function () {
if (this._rx >= this._queues.length) this._rx = 0;
return this._queues[this._rx++];
};

//
// Remark: maybe queue messages to send here?
// In practice ill see if the current defer logic breaks and
// add that here if necessary
//
ReqSocket.prototype._sendNow = function (message, id) {
var queue = this._roundRobin();
if (!queue) return debug('No queue on send with message %j', message);
var id = callback.id = this.id();
this.callbacks[id] = callback;

debug('req socket sending message %j to queue %s with replyTo %s ', message, queue, this.replyTo);
debug('req socket sending message %j to queue %s with replyTo %s ', message, queue, this._replyTo);
var options = {
replyTo: this.replyTo,
replyTo: this._replyTo,
deliveryMode: true,
correlationId: id,
expiration: this.options.expiration,
persistent: this.options.persistent
};

this.channel.sendToQueue(queue, this.pack(message), options);
};

ReqSocket.prototype.send = function (message, callback) {
var id = callback.id = this.id();
this._callbacks[id] = callback;

if (!this._canSend()) {
this._deferredSends.push({ message: message, id: id });
return this;
}

this._sendNow(message, id);

return this;
};


Loading

0 comments on commit 002315e

Please sign in to comment.