Skip to content

Commit

Permalink
Add code and unit test to handle: CONNECTION_FORCED - broker forced c…
Browse files Browse the repository at this point in the history
…onnection closure with reason 'shutdown'
  • Loading branch information
rudijs committed May 27, 2015
1 parent b22ff6a commit 766f77b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
11 changes: 11 additions & 0 deletions lib/rpc-publisher-factory.js
Expand Up @@ -75,6 +75,17 @@ var rpcPublisherProto = {
this.publisherDomain.add(this.currentConnection);
}

// https://github.com/rudijs/amqp.node-rpc-factory/pull/4
// https://github.com/squaremo/amqp.node/issues/110
// https://github.com/squaremo/amqp.node/issues/141
if (!this.standalone) {
conn.on('error', function (err) {
this.logError('Publisher: Unexpected amqplib error handled by event:' + err.stack);
this.connection = null;
this.publisherDomain.remove(this.currentConnection);
}.bind(this));
}

return conn.createChannel()
.then(function createChannelSucces(ch) {

Expand Down
68 changes: 65 additions & 3 deletions test/rpc-publisher-factory.spec.js
Expand Up @@ -45,11 +45,11 @@ describe('RPC Client', function () {
done();
});

it('should handle emitted AMQP connection errors', function(done) {
it('should handle emitted AMQP connection errors', function (done) {

var spy = sinon.spy();

sinon.stub(domain, 'create', function() {
sinon.stub(domain, 'create', function () {
return new EventEmitter();
});

Expand Down Expand Up @@ -300,6 +300,8 @@ describe('RPC Client', function () {
};
},

on: sinon.spy(),

close: sinon.spy()
};

Expand All @@ -315,7 +317,7 @@ describe('RPC Client', function () {
debugLevel: 2
});

publisher.standalone.should.be.false;
//publisher.standalone.should.be.true;

var spy = sinon.spy(publisher, 'logError');

Expand All @@ -340,4 +342,64 @@ describe('RPC Client', function () {

});

it('should handle emitted connection error CONNECTION_FORCED - broker forced connection closure with reason shutdown', function (done) {

var channelStub = {
assertQueue: function () {
return {
then: function (callback) {
callback({queue: uniqueAmqpPrivateReplyQueue});
return assertQueueThenStub;
}
};
},

consume: function (queue, callback, options) {
queue.should.equal(uniqueAmqpPrivateReplyQueue);
(typeof callback).should.equal('function');
options.noAck.should.be.true;
},

sendToQueue: function (replyTo, content, options) {
replyTo.should.equal('node_rpc_queue');
content.toString().should.equal(messageSent);
options.replyTo.should.equal(uniqueAmqpPrivateReplyQueue);
},

close: sinon.spy()

};

var createChannelStub = {
createChannel: function () {
return {
then: function (createChannelSuccess) {
return createChannelSuccess(channelStub);
}
};
},

on: function (event, cb) {
cb(new Error('CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\''));
}
};

var connectStub = {
then: function (getConnectionSuccess) {
return getConnectionSuccess(createChannelStub);
}
};

sinon.stub(amqp, 'connect').returns(connectStub);

var publisher = rpcPublisherFactory.create({
debugLevel: 2
});

publisher.publish(messageSent);

done();

});

});

0 comments on commit 766f77b

Please sign in to comment.