Skip to content

Commit

Permalink
feat(Client): remove receipt callback on createReceiver
Browse files Browse the repository at this point in the history
This greatly simplifies the internals for receiver link creation,
aligning it more closely with sender link creation (so we can begin
to share code between them). It also makes the API more explicit by
requiring the user to understand that the receiver link is indeed
an EventEmitter and should be used that way.
  • Loading branch information
mbroadst committed Jul 31, 2015
1 parent 41523db commit 88e95b2
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 65 deletions.
55 changes: 6 additions & 49 deletions lib/amqp_client.js
Expand Up @@ -77,7 +77,6 @@ function AMQPClient(policy) {
this._attaching = {};
this._senderAttaching = {};
this._attached = {};
this._onReceipt = {};
this._pendingSends = {};
this._unsettledSends = {};

Expand Down Expand Up @@ -253,6 +252,7 @@ AMQPClient.prototype.createSender = function(address, options) {
return;
}

// return deferred attach promise if already attaching
if (self._senderAttaching[linkName]) {
var deferredAttach = function(err, link) {
if (!!err) {
Expand Down Expand Up @@ -334,35 +334,18 @@ AMQPClient.prototype.createSender = function(address, options) {
* @param options.filter Filter used in connecting to the source. See AMQP spec for details, and your server's documentation
* for possible values. node-amqp-encoder'd maps will be translated, and simple maps will be converted
* to AMQP Fields type as defined in the spec.
* @param {function} cb Callback to invoke on every receipt. Called with (error, message).
*
* @return {Promise}
*/
AMQPClient.prototype.createReceiver = function(address, options, cb) {
AMQPClient.prototype.createReceiver = function(address, options) {
if (!this._connection) { throw new Error('Must connect before receiving'); }

var self = this;
if (cb === undefined) {
if (typeof address === 'function') {
cb = address;
address = this._defaultQueue;
options = undefined;
} else if (typeof address !== 'string') {
cb = options;
options = address;
address = this._defaultQueue;
} else {
if (typeof options !== 'object') {
cb = options;
options = undefined;
}
}
}
address = address || this._defaultQueue;
options = options || {};

var self = this;
var linkName = address + '_RX';
if (this._attaching[linkName]) {
self._onReceipt[linkName].push(cb);

return new Promise(function(resolve, reject) {
var attachingListener = function(link) {
if (link.name === linkName) {
Expand Down Expand Up @@ -391,7 +374,7 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) {
}

if (options.policy) {
u.deepMerge(linkPolicy, options.policy);
linkPolicy = u.deepMerge(linkPolicy, options.policy);
}
}

Expand All @@ -400,14 +383,9 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) {
return resolve(self._attached[linkName]);

// otherwise create the link, and set initial state
if (self._onReceipt[linkName] === undefined) self._onReceipt[linkName] = [];
if (self._attached[linkName] === undefined) self._attached[linkName] = null;
if (self._attaching[linkName] === undefined) self._attaching[linkName] = false;

if (!!cb) {
self._onReceipt[linkName].push(cb);
}

var attach = function() {
self._attaching[linkName] = true;

Expand All @@ -419,30 +397,11 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) {
self._attached[linkName] = l;

l.on(Link.ErrorReceived, function(err) {
var cbs = self._onReceipt[linkName];
if (cbs && cbs.length > 0) {
cbs.forEach(function (cb) {
cb(err);
});
}

// @todo: most likely never called, research a way to properly
// resolve or reject this promise.
reject(err);
});

l.on(Link.MessageReceived, function(message) {
var payload = message.body[0] || message.body;
message.body = l.policy.decoder ? l.policy.decoder(payload) : payload;
debug('received from (' + address + '): ' + message.body);
var cbs = self._onReceipt[linkName];
if (cbs && cbs.length > 0) {
cbs.forEach(function (cb) {
cb(null, message);
});
}
});

l.on(Link.Detached, function(details) {
debug('link detached: ' + (details ? details.error : 'No details'));
self._attached[linkName] = undefined;
Expand Down Expand Up @@ -513,7 +472,6 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {

if (!saveReconnectDetails) {
this._pendingSends = {};
this._onReceipt = {};
this._reattach = {};
this._reconnect = null;
}
Expand All @@ -532,7 +490,6 @@ AMQPClient.prototype._preventReconnect = function() {
this._reconnect = null;
this._reattach = {};
this._pendingSends = {};
this._onReceipt = {};
};

AMQPClient.prototype._shouldReconnect = function() {
Expand Down
9 changes: 8 additions & 1 deletion lib/receiver_link.js
Expand Up @@ -107,7 +107,14 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
this.accept(transferFrame.message);
}

this.emit(Link.MessageReceived, transferFrame.message);
// optionally decode message based on policy
var message = transferFrame.message,
payload = message.body[0] || message.body;

message.body = this.policy.decoder ? this.policy.decoder(payload) : payload;
debug('received from (' + this.name + '): ' + message.body);

this.emit(Link.MessageReceived, message);
this._checkCredit();
};

Expand Down
8 changes: 5 additions & 3 deletions test/integration/qpid/client.test.js
Expand Up @@ -73,7 +73,7 @@ describe('Client', function() {

it('should be able to create multiple receivers for same link', function(done) {
var receviedCount = 0;
var messageHandler = function(err, message) {
var messageHandler = function(message) {
expect(message.body).to.equal('TESTMESSAGE');
receviedCount++;
if (receviedCount === 2) done();
Expand All @@ -82,12 +82,14 @@ describe('Client', function() {
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.defaultLink, null, messageHandler),
test.client.createReceiver(config.defaultLink, null, messageHandler),
test.client.createReceiver(config.defaultLink),
test.client.createReceiver(config.defaultLink),
test.client.createSender(config.defaultLink)
]);
})
.spread(function(receiver1, receiver2, senderLink) {
receiver1.on('message', messageHandler);
receiver2.on('message', messageHandler);
return senderLink.send('TESTMESSAGE');
});
});
Expand Down
8 changes: 5 additions & 3 deletions test/integration/servicebus/queue.test.js
Expand Up @@ -75,7 +75,7 @@ describe('ServiceBus', function() {
it('should be able to create multiple receivers for same link', function(done) {
var receviedCount = 0;
var messageHandler = function(err, message) {
var messageHandler = function(message) {
expect(message.body).to.equal('TESTMESSAGE');
receviedCount++;
if (receviedCount === 2) done();
Expand All @@ -84,12 +84,14 @@ describe('ServiceBus', function() {
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.defaultLink, null, messageHandler),
test.client.createReceiver(config.defaultLink, null, messageHandler),
test.client.createReceiver(config.defaultLink),
test.client.createReceiver(config.defaultLink),
test.client.createSender(config.defaultLink)
]);
})
.spread(function(receiver1, receiver2, senderLink) {
receiver1.on('message', messageHandler);
receiver2.on('message', messageHandler);
return senderLink.send('TESTMESSAGE');
});
});
Expand Down
18 changes: 9 additions & 9 deletions test/unit/test_amqpclient.js
Expand Up @@ -187,16 +187,16 @@ describe('AMQPClient', function() {
return client.connect(mock_uri)
.then(function() {
// create but don't wait so we can simulate an attaching link
client.createReceiver(queue, function(err, payload, annotations) {})
client.createReceiver(queue)
.then(function(link) {
originalLink = link;
});
})
.then(function() {
return Promise.all([
client.createReceiver(queue, function(err, payload, annotations) {}),
client.createReceiver(queue, function(err, payload, annotations) {}),
client.createReceiver(queue, function(err, payload, annotations) {})
client.createReceiver(queue),
client.createReceiver(queue),
client.createReceiver(queue)
]);
})
.spread(function(link1, link2, link3) {
Expand Down Expand Up @@ -244,7 +244,7 @@ describe('AMQPClient', function() {

return client.connect(mock_uri)
.then(function() {
return client.createReceiver(queue, function(err, payload, annotations) {});
return client.createReceiver(queue);
})
.then(function() {
expect(c._created).to.eql(1);
Expand Down Expand Up @@ -298,8 +298,8 @@ describe('AMQPClient', function() {
return client.connect(mock_uri)
.then(function() {
return Promise.all([
client.createReceiver('queue1', function(err, payload, annotations) {}),
client.createReceiver('queue2', function(err, payload, annotations) {})
client.createReceiver('queue1'),
client.createReceiver('queue2')
]);
})
.then(function() {
Expand Down Expand Up @@ -354,7 +354,7 @@ describe('AMQPClient', function() {

return client.connect(mock_uri)
.then(function() {
return client.createReceiver(queue, function() {});
return client.createReceiver(queue);
})
.then(function() {
process.nextTick(function() {
Expand Down Expand Up @@ -409,7 +409,7 @@ describe('AMQPClient', function() {

return client.connect(mock_uri)
.then(function() {
return client.createReceiver(queue, function() {});
return client.createReceiver(queue);
})
.then(function() {
process.nextTick(function() {
Expand Down

0 comments on commit 88e95b2

Please sign in to comment.