Skip to content

Commit

Permalink
fix(sender-link): missing Object.keys around unsettled dispatch
Browse files Browse the repository at this point in the history
We were checking the length of the unsettled sends as if it were
an Array, when it's really a map. This resulted in unsettled sends
never being properly rejected in the event of a connection error.
  • Loading branch information
mbroadst committed Oct 25, 2016
1 parent e5bac49 commit b0580b9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/sender_link.js
Expand Up @@ -226,7 +226,7 @@ SenderLink.prototype._dispatchPendingSends = function(err) {
}
}

if (!!err && this._unsettledSends && this._unsettledSends.length) {
if (!!err && this._unsettledSends && Object.keys(this._unsettledSends).length) {
var self = this;
Object.keys(this._unsettledSends).forEach(function(id) {
var deferredSender = self._unsettledSends[id];
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -21,6 +21,7 @@
"benchmark": "^2.1.1",
"chai": "^3.5.0",
"chai-string": "^1.2.0",
"chai-as-promised": "^6.0.0",
"conventional-changelog-cli": "^1.2.0",
"istanbul": "^0.4.4",
"jsdoc-to-markdown": "^1.3.7",
Expand Down
43 changes: 42 additions & 1 deletion test/unit/client.test.js
Expand Up @@ -2,7 +2,8 @@

var _ = require('lodash'),
amqp = require('../../lib'),
expect = require('chai').expect,
chai = require('chai'),
expect = chai.expect,
Builder = require('buffer-builder'),
AMQPClient = require('../../lib').Client,
MockServer = require('./mocks').Server,
Expand All @@ -19,6 +20,8 @@ var _ = require('lodash'),

test = require('./test-fixture');

chai.use(require('chai-as-promised'));

var TestPolicy = new Policy({
connect: { options: { containerId: 'test' } },
reconnect: { retries: 0, forever: false }
Expand Down Expand Up @@ -503,6 +506,44 @@ describe('Client', function() {
}, 200);
});
});

it('should reject unsettled sends on connection error', function() {
test.server.setResponseSequence([
constants.amqpVersion,
new frames.OpenFrame(test.client.policy.connect.options),
new frames.BeginFrame({
remoteChannel: 1, nextOutgoingId: 0,
incomingWindow: 2147483647, outgoingWindow: 2147483647,
handleMax: 4294967295
}),
[
function (prev) {
var rxAttach = frames.readFrame(prev[prev.length - 1]);
return new frames.AttachFrame({
name: rxAttach.name, handle: 1,
role: constants.linkRole.receiver,
source: {}, target: {},
initialDeliveryCount: 0
});
},

{ delay: 1000 },

// force detach from remote server, and force close of the connection
new frames.CloseFrame({
error: new AMQPError({ condition: ErrorCondition.ConnectionForced, description: 'test' })
})
]
]);

return test.client.connect(test.server.address())
.then(function() { return test.client.createSender('testing'); })
.then(function(sender) {
sender.linkCredit = 1;
var sendPromise = sender.send('testing');
expect(sendPromise).to.eventually.be.rejectedWith(errors.ProtocolError);
});
});
});

describe('#reconnect()', function() {
Expand Down

0 comments on commit b0580b9

Please sign in to comment.