Skip to content

Commit

Permalink
[fix] fix a bug and add more debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrugzz committed Jun 8, 2015
1 parent 8a30949 commit 11173c0
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ Socket.prototype._deferMethod = function (method) {
};

Socket.prototype._operation = function (method, args) {
debug('deferring operation %s %j', method, args);
if (this.channel && this.ready && method === 'connect') {
return this[method].apply(this, args);
}
this.operations[method].push({ method: method, args: args });
};

Socket.prototype._setup = function (channel) {
debug('%s socket setup', this.type);
this._setChannel(channel);
this._runDeferred('connect');
this.emit('ready');
this.ready = true;
this.emit('ready');
};

Socket.prototype._runDeferred = function (method) {
Expand Down Expand Up @@ -145,7 +147,6 @@ util.inherits(RepSocket, Socket);
function RepSocket() {
this.type = 'REP';
this.methods = ['connect'];
Socket.apply(this, arguments);

this.consumers = {};

Expand All @@ -160,6 +161,9 @@ function RepSocket() {
this.connect(sources[s]);
}
}.bind(this));

Socket.apply(this, arguments);

}

RepSocket.prototype.connect = function (source, callback) {
Expand Down Expand Up @@ -230,7 +234,6 @@ util.inherits(ReqSocket, Socket);
function ReqSocket () {
this.type = 'REQ';
this.methods = ['connect', 'send'];
Socket.apply(this, arguments);

this.replyTo = undefined;
this.rx = 0;
Expand All @@ -241,17 +244,25 @@ function ReqSocket () {
this.dests = [];
this.callbacks = {};

if (this.ready && this.channel)
if (this.ready && this.channel) {
debug('%s channel ready, setting up consumer', this.type);
this._setupConsumer();
else
}
else {
debug('%s channel waiting on ready', this.type);
this.on('ready', this._onReady.bind(this));
}

Socket.apply(this, arguments);

}

//
// When we are reconnecting we need to reconnect to our queues on the new
// channel
//
ReqSocket.prototype._onReady = function () {
debug('req socket on ready')
if (!this.reconnecting) return this._setupConsumer();

this._setupConsumer();
Expand All @@ -272,10 +283,12 @@ ReqSocket.prototype._onReady = function () {

ReqSocket.prototype._setupConsumer = function () {
var self = this;
debug('setup consumer req');
//
// Remark: We create an ephemeral reply queue here to receive messages on
//
this.channel.assertQueue('', {exclusive: true, autoDelete: true }, function (err, ok) {
debug('reply queue asserted %s %j', err, ok);
if (err) { return self.emit('error', err); }
self.replyTo = ok.queue;
self._canMaybeSend();
Expand All @@ -291,7 +304,7 @@ ReqSocket.prototype._canMaybeSend = function () {

ReqSocket.prototype._consume = function (msg) {
if (msg === null) return debug('Req socket msg received is null');
debug('Req socket received reply over ephemeral queue %s %j', this.replyTo, msg);
debug('Req socket received reply over ephemeral queue %s', this.replyTo);
this.reply(msg);
this.channel.ack(msg);
};
Expand All @@ -315,9 +328,11 @@ ReqSocket.prototype.reply = function (msg) {

ReqSocket.prototype.connect = function (destination, callback) {
var self = this;
this.dest.push(destination);
debug('req socket connect called');
this.dests.push(destination);
this.channel.assertQueue(destination,
{ durable: this.options.persistent }, function (err, ok) {
debug('req socket send queue connected');
if (err) return callback ? callback(err) : self.emit('error', err);
self.queues.push(ok.queue);
self._canMaybeSend();
Expand Down

0 comments on commit 11173c0

Please sign in to comment.