Skip to content

Commit

Permalink
fix(disposition): only send disposition to relevant links by role
Browse files Browse the repository at this point in the history
Previously we ignored the role of the incoming disposition frame,
and forwarded these frames to the _dispositionRecieved method of
all links we knew about. This patch corrects this behavior, and
includes an integration test verifying that frames are forwarded
to the appropriate link types.
  • Loading branch information
mbroadst committed Jul 29, 2015
1 parent ac7d88e commit f93f2a7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
14 changes: 10 additions & 4 deletions lib/session.js
Expand Up @@ -119,9 +119,11 @@ function Session(conn) {
this._allocatedHandles = {};
this._linksByName = {};
this._linksByRemoteHandle = {};
this._senderLinks = [];
this._deliveryTag = 1;

this._senderLinks = [];
this._receiverLinks = [];

var self = this;
var stateMachine = {
'UNMAPPED': {
Expand Down Expand Up @@ -216,6 +218,7 @@ Session.prototype.attachLink = function(linkPolicy) {
this._senderLinks.push(link);
} else {
link = new ReceiverLink(this, policy.options.handle, policy);
this._receiverLinks.push(link);
}

this._allocatedHandles[policy.options.handle] = link;
Expand Down Expand Up @@ -414,9 +417,12 @@ Session.prototype._handleDisposition = function(frame) {
state: frame.state
};

_.values(this._linksByName).forEach(function(link) {
link._dispositionReceived(disposition);
});
var dispositionHandler = function(l) { l._dispositionReceived(disposition); };
if (frame.role === constants.linkRole.sender) {
this._receiverLinks.forEach(dispositionHandler);
} else {
this._senderLinks.forEach(dispositionHandler);
}

this.emit(Session.DispositionReceived, disposition);
};
Expand Down
35 changes: 31 additions & 4 deletions test/integration/qpid/disposition.test.js
@@ -1,10 +1,14 @@
'use strict';
var AMQPClient = require('../../..').Client,
var Promise = require('bluebird'),
AMQPClient = require('../../..').Client,
BrokerAgent = require('qmf2'),

Session = require('../../../lib/session'),

c = require('../../../').Constants,
Promise = require('bluebird'),

config = require('./config'),
expect = require('chai').expect,
BrokerAgent = require('qmf2');
expect = require('chai').expect;

var test = {};
describe('QPID', function() {
Expand Down Expand Up @@ -101,5 +105,28 @@ describe('Disposition', function() {
});
});

it('should forward disposition frames by link role', function(done) {
var queueName = 'test.disposition.queue';
var called = { receiver: false, sender: false };
return test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(queueName),
test.client.createSender(queueName)
]);
})
.spread(function(receiver, sender) {
receiver._dispositionReceived = function(d) { called.receiver = true; };
sender._dispositionReceived = function(d) { called.sender = true; };

test.client._session.on(Session.DispositionReceived, function(d) {
expect(called).to.eql({ receiver: false, sender: true });
done();
});

return sender.send('test message');
});
});

});
});

0 comments on commit f93f2a7

Please sign in to comment.