diff --git a/socket.js b/socket.js index 6ea2bc9..8ac663d 100644 --- a/socket.js +++ b/socket.js @@ -75,6 +75,7 @@ Socket.prototype._deferMethod = function (method) { }; Socket.prototype._operation = function (method, args) { + debug('deferring operation %s %j', method, args); if (this.channel && this.ready && method === 'connect') { return this[method].apply(this, args); } @@ -82,10 +83,11 @@ Socket.prototype._operation = function (method, args) { }; Socket.prototype._setup = function (channel) { + debug('%s socket setup', this.type); this._setChannel(channel); this._runDeferred('connect'); - this.emit('ready'); this.ready = true; + this.emit('ready'); }; Socket.prototype._runDeferred = function (method) { @@ -145,7 +147,6 @@ util.inherits(RepSocket, Socket); function RepSocket() { this.type = 'REP'; this.methods = ['connect']; - Socket.apply(this, arguments); this.consumers = {}; @@ -160,6 +161,9 @@ function RepSocket() { this.connect(sources[s]); } }.bind(this)); + + Socket.apply(this, arguments); + } RepSocket.prototype.connect = function (source, callback) { @@ -230,7 +234,6 @@ util.inherits(ReqSocket, Socket); function ReqSocket () { this.type = 'REQ'; this.methods = ['connect', 'send']; - Socket.apply(this, arguments); this.replyTo = undefined; this.rx = 0; @@ -241,10 +244,17 @@ function ReqSocket () { this.dests = []; this.callbacks = {}; - if (this.ready && this.channel) + if (this.ready && this.channel) { + debug('%s channel ready, setting up consumer', this.type); this._setupConsumer(); - else + } + else { + debug('%s channel waiting on ready', this.type); this.on('ready', this._onReady.bind(this)); + } + + Socket.apply(this, arguments); + } // @@ -252,6 +262,7 @@ function ReqSocket () { // channel // ReqSocket.prototype._onReady = function () { + debug('req socket on ready') if (!this.reconnecting) return this._setupConsumer(); this._setupConsumer(); @@ -272,10 +283,12 @@ ReqSocket.prototype._onReady = function () { ReqSocket.prototype._setupConsumer = function () { var self = this; + debug('setup consumer req'); // // Remark: We create an ephemeral reply queue here to receive messages on // this.channel.assertQueue('', {exclusive: true, autoDelete: true }, function (err, ok) { + debug('reply queue asserted %s %j', err, ok); if (err) { return self.emit('error', err); } self.replyTo = ok.queue; self._canMaybeSend(); @@ -291,7 +304,7 @@ ReqSocket.prototype._canMaybeSend = function () { ReqSocket.prototype._consume = function (msg) { if (msg === null) return debug('Req socket msg received is null'); - debug('Req socket received reply over ephemeral queue %s %j', this.replyTo, msg); + debug('Req socket received reply over ephemeral queue %s', this.replyTo); this.reply(msg); this.channel.ack(msg); }; @@ -315,9 +328,11 @@ ReqSocket.prototype.reply = function (msg) { ReqSocket.prototype.connect = function (destination, callback) { var self = this; - this.dest.push(destination); + debug('req socket connect called'); + this.dests.push(destination); this.channel.assertQueue(destination, { durable: this.options.persistent }, function (err, ok) { + debug('req socket send queue connected'); if (err) return callback ? callback(err) : self.emit('error', err); self.queues.push(ok.queue); self._canMaybeSend();