Skip to content

Commit

Permalink
clean up _onConnected
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Junker committed Jun 8, 2015
1 parent a3d8359 commit 1f3b0d7
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,13 @@ function Socket (rabbit, options) {
this.ready = false;
this.deferredConnection = false;

this._deferredSends = [];

if(!this.rabbit.ready) {
this.rabbit.once('ready', this._setup.bind(this));
} else {
this._setup(this.rabbit.channel);
}
}

Socket.prototype._onConnected = function () {
this.isConnected = true;

if (!this._sendNow) return;
// TODO _sendNow only exists on Req, this needs to move
for (var i = 0; i < this._deferredSends.length; i++) {
var message = this._deferredSends[i];
this._sendNow(message.message, message.id);
}

this._deferredSends = [];
};

Socket.prototype._setup = function (channel) {
this._setChannel(channel);
Expand Down Expand Up @@ -175,12 +161,25 @@ function ReqSocket () {
this.queues = [];
this.callbacks = {};

this._deferredSends = [];

if (this.ready && this.channel)
this._setupConsumer();
else
this.once('ready', this._setupConsumer.bind(this));
}

ReqSocket.prototype._sendDeferredMessages = function () {
this.isConnected = true;

for (var i = 0; i < this._deferredSends.length; i++) {
var message = this._deferredSends[i];
this._sendNow(message.message, message.id);
}

this._deferredSends = [];
};

ReqSocket.prototype._setupConsumer = function () {
var self = this;
//
Expand Down Expand Up @@ -230,7 +229,7 @@ ReqSocket.prototype.connect = function (destination) {
{ durable: this.options.persistent }, function (err, ok) {
if (err) return void self.emit('error', err);
self.queues.push(ok.queue);
self._onConnected();
self._sendDeferredMessages();
self.emit('connect');
});

Expand Down

0 comments on commit 1f3b0d7

Please sign in to comment.