Skip to content

Commit

Permalink
fix(link-creation): remove support for tracking duplicate links
Browse files Browse the repository at this point in the history
Per our discussion in the gitter channel, support for tracking
attempts to create duplicate links has been fully removed. This
greatly reduces the complexity of link creation, allowing us to
remove Session._linksByName, the need for an object hash, as well
as a swath of logic to track links by name/id/options.
  • Loading branch information
mbroadst committed Aug 3, 2015
1 parent 1b50b59 commit ad602d2
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 153 deletions.
31 changes: 4 additions & 27 deletions lib/amqp_client.js
Expand Up @@ -204,30 +204,19 @@ AMQPClient.prototype.createSender = function(address, options) {
}
}, this.policy.senderLink);

// store link id
linkPolicy.id = u.linkHash(address, options);

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var link = self._session._senderLinks[linkPolicy.id];
if (link && link.state() === 'attached') {
return resolve(link);
}

if (!link) {
link = self._session.createLink(linkPolicy);
}

var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

self._reattach[linkPolicy.id] = attach;
self._reattach[linkName] = attach;
attach();
});
};
Expand Down Expand Up @@ -276,31 +265,19 @@ AMQPClient.prototype.createReceiver = function(address, options) {
}
}

// store link id
linkPolicy.id = u.linkHash(address, options);

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {

var link = self._session._receiverLinks[linkPolicy.id];
if (link && link.state() === 'attached') {
return resolve(link);
}

if (!link) {
link = self._session.createLink(linkPolicy);
}

var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

self._reattach[linkPolicy.id] = attach;
self._reattach[linkName] = attach;
attach();
});
};
Expand Down
1 change: 0 additions & 1 deletion lib/link.js
Expand Up @@ -164,7 +164,6 @@ Link.prototype._detached = function(frame) {
this.remoteHandle = undefined;
}

delete this.session._linksByName[this.policy.options.name];
delete this.session._allocatedHandles[this.policy.options.handle];

this.emit(Link.Detached, { closed: frame.closed, error: frame.error });
Expand Down
21 changes: 8 additions & 13 deletions lib/session.js
Expand Up @@ -122,9 +122,6 @@ function Session(conn) {
this._senderLinks = {};
this._receiverLinks = {};

this._linksByName = {};
this._linksByName[constants.linkRole.sender] = {};
this._linksByName[constants.linkRole.receiver] = {};
this._linksByRemoteHandle = {};


Expand Down Expand Up @@ -213,25 +210,21 @@ Session.prototype.createLink = function(linkPolicy) {
var link;
if (policy.options.role === constants.linkRole.sender) {
link = new SenderLink(this, policy.options.handle, policy);
this._senderLinks[policy.id] = link;
this._senderLinks[policy.options.name] = link;
} else {
link = new ReceiverLink(this, policy.options.handle, policy);
this._receiverLinks[policy.id] = link;
this._receiverLinks[policy.options.name] = link;
}

this._allocatedHandles[policy.options.handle] = link;

// NOTE: we need to store it in the opposite role because that's how we will
// receive the corresponding attach frame from the broker
this._linksByName[!policy.options.role][policy.options.name] = link;

var self = this;
link.on(Link.Detached, function(details) {
debug('detached(' + link.name + '): ' + (details ? details.error : 'No details'));
if (policy.options.role === constants.linkRole.sender) {
delete self._senderLinks[policy.id];
delete self._senderLinks[policy.options.name];
} else {
delete self._receiverLinks[policy.id];
delete self._receiverLinks[policy.options.name];
}
});

Expand Down Expand Up @@ -311,8 +304,10 @@ Session.prototype._processFrame = function(frame) {
}
this._unmap();
} else if (frame instanceof AttachFrame) {
if (frame.name && this._linksByName[frame.role][frame.name]) {
this._linksByName[frame.role][frame.name]._attachReceived(frame);
var links =
(frame.role === constants.linkRole.sender) ? this._receiverLinks : this._senderLinks;
if (frame.name && links[frame.name]) {
links[frame.name]._attachReceived(frame);
} else {
// @todo Proper error reporting. Should we shut down session?
console.warn('received Attach for unknown link(' + frame.name + '): ' + JSON.stringify(frame));
Expand Down
12 changes: 0 additions & 12 deletions lib/utilities.js
Expand Up @@ -6,7 +6,6 @@ var _ = require('lodash'),
constants = require('./constants'),
errors = require('./errors'),

objectHash = require('object-hash'),
uuid = require('uuid');


Expand Down Expand Up @@ -284,14 +283,3 @@ module.exports.dispositionRange = function(message) {
module.exports.linkName = function(address, options) {
return ((!!options && !!options.name) ? options.name : address + '_' + uuid.v4());
};

/**
* Generates a hash for a given link address and options
*
* @param address link address
* @param options link creation options
* @return String
*/
module.exports.linkHash = function(address, options) {
return objectHash({ address: address, options: options });
};
5 changes: 2 additions & 3 deletions package.json
Expand Up @@ -23,9 +23,8 @@
"istanbul": "^0.3.6",
"jshint": "^2.6.0",
"mocha": "^2.1.0",
"object-hash": "^0.8.0",
"stream-buffers": "2.1.0",
"qmf2": "^0.0.5"
"qmf2": "^0.0.5",
"stream-buffers": "2.1.0"
},
"scripts": {},
"author": {
Expand Down
13 changes: 0 additions & 13 deletions test/integration/qpid/client.test.js
Expand Up @@ -58,19 +58,6 @@ describe('Client', function() {
});
});

it('should return the same link when one sender link is attaching', function() {
return test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createSender(config.defaultLink),
test.client.createSender(config.defaultLink)
]);
})
.spread(function(first, second) {
expect(first).to.eql(second);
});
});

it('should be able to create multiple receivers for same link', function(done) {
var receviedCount = 0;
var messageHandler = function(message) {
Expand Down
84 changes: 0 additions & 84 deletions test/unit/test_amqpclient.js
Expand Up @@ -138,90 +138,6 @@ describe('AMQPClient', function() {
expect(function() { client.createReceiver(); }).to.throw(Error);
});

function DelayedAttachMockSession(conn) {
Mock.Session.call(this);
}
util.inherits(DelayedAttachMockSession, Mock.Session);

DelayedAttachMockSession.prototype.attachLink = function(policy) {
var link = this._mockLinks[policy.options.name];
expect(link).to.exist;
link._created++;
link._clearState();

// emulate some delayed link attachment
var self = this;
setTimeout(function() {
self.emit('attachLink-called', self, policy, link);
}, 100);

return link;
};

it('should return cached receiver links upon multiple createReceive calls', function() {
var c = new Mock.Connection();
var s = new DelayedAttachMockSession(c);
var l = new Mock.ReceiverLink(s, {
name: 'queue_RX',
isSender: false,
capacity: 100
});

s._addMockLink(l);
var client = new Mock.Client(c, s);
var queue = 'queue';
var called = { open: 0, begin: 0, attachLink: 0 };
c.on('open-called', function(_c, _addr, _sasl) {
expect(_addr).to.eql(u.parseAddress(mock_uri));
expect(_sasl).to.not.exist;

called.open++;
_c.emit(Connection.Connected, _c);
});

s.on('begin-called', function(_s, _policy) {
called.begin++;

_s.mapped = true;
_s.emit(Session.Mapped, _s);
});

s.on('attachLink-called', function(_s, _policy, _l) {
called.attachLink++;
expect(_policy.options.source).to.eql({ address: queue });
expect(_policy.options.role).to.eql(constants.linkRole.receiver);

process.nextTick(function() {
_l.simulateAttaching();
});
});

var originalLink;
return client.connect(mock_uri)
.then(function() {
// create but don't wait so we can simulate an attaching link
client.createReceiver(queue, { name: 'queue_RX' })
.then(function(link) {
originalLink = link;
});
})
.then(function() {
return Promise.all([
client.createReceiver(queue, { name: 'queue_RX' }),
client.createReceiver(queue, { name: 'queue_RX' }),
client.createReceiver(queue, { name: 'queue_RX' })
]);
})
.spread(function(link1, link2, link3) {
expect(originalLink).to.exist;
expect(link1).to.eql(originalLink);
expect(link1).to.eql(link2);
expect(link1).to.eql(link3);
expect(link2).to.eql(link3);
});
});


it('should create connection, session, and link on receive with full address', function() {
var c = new Mock.Connection();
var s = new Mock.Session(c);
Expand Down

0 comments on commit ad602d2

Please sign in to comment.