From f93f2a7954d346f8cc7de8efea02d2483ab9a5b4 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Tue, 28 Jul 2015 09:30:55 -0400 Subject: [PATCH] fix(disposition): only send disposition to relevant links by role Previously we ignored the role of the incoming disposition frame, and forwarded these frames to the _dispositionRecieved method of all links we knew about. This patch corrects this behavior, and includes an integration test verifying that frames are forwarded to the appropriate link types. --- lib/session.js | 14 ++++++--- test/integration/qpid/disposition.test.js | 35 ++++++++++++++++++++--- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/lib/session.js b/lib/session.js index d914e76..62d6fce 100644 --- a/lib/session.js +++ b/lib/session.js @@ -119,9 +119,11 @@ function Session(conn) { this._allocatedHandles = {}; this._linksByName = {}; this._linksByRemoteHandle = {}; - this._senderLinks = []; this._deliveryTag = 1; + this._senderLinks = []; + this._receiverLinks = []; + var self = this; var stateMachine = { 'UNMAPPED': { @@ -216,6 +218,7 @@ Session.prototype.attachLink = function(linkPolicy) { this._senderLinks.push(link); } else { link = new ReceiverLink(this, policy.options.handle, policy); + this._receiverLinks.push(link); } this._allocatedHandles[policy.options.handle] = link; @@ -414,9 +417,12 @@ Session.prototype._handleDisposition = function(frame) { state: frame.state }; - _.values(this._linksByName).forEach(function(link) { - link._dispositionReceived(disposition); - }); + var dispositionHandler = function(l) { l._dispositionReceived(disposition); }; + if (frame.role === constants.linkRole.sender) { + this._receiverLinks.forEach(dispositionHandler); + } else { + this._senderLinks.forEach(dispositionHandler); + } this.emit(Session.DispositionReceived, disposition); }; diff --git a/test/integration/qpid/disposition.test.js b/test/integration/qpid/disposition.test.js index 4a3940c..845ffc3 100644 --- a/test/integration/qpid/disposition.test.js +++ b/test/integration/qpid/disposition.test.js @@ -1,10 +1,14 @@ 'use strict'; -var AMQPClient = require('../../..').Client, +var Promise = require('bluebird'), + AMQPClient = require('../../..').Client, + BrokerAgent = require('qmf2'), + + Session = require('../../../lib/session'), + c = require('../../../').Constants, - Promise = require('bluebird'), + config = require('./config'), - expect = require('chai').expect, - BrokerAgent = require('qmf2'); + expect = require('chai').expect; var test = {}; describe('QPID', function() { @@ -101,5 +105,28 @@ describe('Disposition', function() { }); }); + it('should forward disposition frames by link role', function(done) { + var queueName = 'test.disposition.queue'; + var called = { receiver: false, sender: false }; + return test.client.connect(config.address) + .then(function() { + return Promise.all([ + test.client.createReceiver(queueName), + test.client.createSender(queueName) + ]); + }) + .spread(function(receiver, sender) { + receiver._dispositionReceived = function(d) { called.receiver = true; }; + sender._dispositionReceived = function(d) { called.sender = true; }; + + test.client._session.on(Session.DispositionReceived, function(d) { + expect(called).to.eql({ receiver: false, sender: true }); + done(); + }); + + return sender.send('test message'); + }); + }); + }); });