From 88e95b2a0183510b8224d99753f21e0900c0d43b Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Fri, 31 Jul 2015 08:46:39 -0400 Subject: [PATCH] feat(Client): remove receipt callback on createReceiver This greatly simplifies the internals for receiver link creation, aligning it more closely with sender link creation (so we can begin to share code between them). It also makes the API more explicit by requiring the user to understand that the receiver link is indeed an EventEmitter and should be used that way. --- lib/amqp_client.js | 55 +++-------------------- lib/receiver_link.js | 9 +++- test/integration/qpid/client.test.js | 8 ++-- test/integration/servicebus/queue.test.js | 8 ++-- test/unit/test_amqpclient.js | 18 ++++---- 5 files changed, 33 insertions(+), 65 deletions(-) diff --git a/lib/amqp_client.js b/lib/amqp_client.js index e612e12..f67a989 100644 --- a/lib/amqp_client.js +++ b/lib/amqp_client.js @@ -77,7 +77,6 @@ function AMQPClient(policy) { this._attaching = {}; this._senderAttaching = {}; this._attached = {}; - this._onReceipt = {}; this._pendingSends = {}; this._unsettledSends = {}; @@ -253,6 +252,7 @@ AMQPClient.prototype.createSender = function(address, options) { return; } + // return deferred attach promise if already attaching if (self._senderAttaching[linkName]) { var deferredAttach = function(err, link) { if (!!err) { @@ -334,35 +334,18 @@ AMQPClient.prototype.createSender = function(address, options) { * @param options.filter Filter used in connecting to the source. See AMQP spec for details, and your server's documentation * for possible values. node-amqp-encoder'd maps will be translated, and simple maps will be converted * to AMQP Fields type as defined in the spec. - * @param {function} cb Callback to invoke on every receipt. Called with (error, message). * * @return {Promise} */ -AMQPClient.prototype.createReceiver = function(address, options, cb) { +AMQPClient.prototype.createReceiver = function(address, options) { if (!this._connection) { throw new Error('Must connect before receiving'); } - var self = this; - if (cb === undefined) { - if (typeof address === 'function') { - cb = address; - address = this._defaultQueue; - options = undefined; - } else if (typeof address !== 'string') { - cb = options; - options = address; - address = this._defaultQueue; - } else { - if (typeof options !== 'object') { - cb = options; - options = undefined; - } - } - } + address = address || this._defaultQueue; + options = options || {}; + var self = this; var linkName = address + '_RX'; if (this._attaching[linkName]) { - self._onReceipt[linkName].push(cb); - return new Promise(function(resolve, reject) { var attachingListener = function(link) { if (link.name === linkName) { @@ -391,7 +374,7 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { } if (options.policy) { - u.deepMerge(linkPolicy, options.policy); + linkPolicy = u.deepMerge(linkPolicy, options.policy); } } @@ -400,14 +383,9 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { return resolve(self._attached[linkName]); // otherwise create the link, and set initial state - if (self._onReceipt[linkName] === undefined) self._onReceipt[linkName] = []; if (self._attached[linkName] === undefined) self._attached[linkName] = null; if (self._attaching[linkName] === undefined) self._attaching[linkName] = false; - if (!!cb) { - self._onReceipt[linkName].push(cb); - } - var attach = function() { self._attaching[linkName] = true; @@ -419,30 +397,11 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { self._attached[linkName] = l; l.on(Link.ErrorReceived, function(err) { - var cbs = self._onReceipt[linkName]; - if (cbs && cbs.length > 0) { - cbs.forEach(function (cb) { - cb(err); - }); - } - // @todo: most likely never called, research a way to properly // resolve or reject this promise. reject(err); }); - l.on(Link.MessageReceived, function(message) { - var payload = message.body[0] || message.body; - message.body = l.policy.decoder ? l.policy.decoder(payload) : payload; - debug('received from (' + address + '): ' + message.body); - var cbs = self._onReceipt[linkName]; - if (cbs && cbs.length > 0) { - cbs.forEach(function (cb) { - cb(null, message); - }); - } - }); - l.on(Link.Detached, function(details) { debug('link detached: ' + (details ? details.error : 'No details')); self._attached[linkName] = undefined; @@ -513,7 +472,6 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) { if (!saveReconnectDetails) { this._pendingSends = {}; - this._onReceipt = {}; this._reattach = {}; this._reconnect = null; } @@ -532,7 +490,6 @@ AMQPClient.prototype._preventReconnect = function() { this._reconnect = null; this._reattach = {}; this._pendingSends = {}; - this._onReceipt = {}; }; AMQPClient.prototype._shouldReconnect = function() { diff --git a/lib/receiver_link.js b/lib/receiver_link.js index 1d31164..605f187 100644 --- a/lib/receiver_link.js +++ b/lib/receiver_link.js @@ -107,7 +107,14 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) { this.accept(transferFrame.message); } - this.emit(Link.MessageReceived, transferFrame.message); + // optionally decode message based on policy + var message = transferFrame.message, + payload = message.body[0] || message.body; + + message.body = this.policy.decoder ? this.policy.decoder(payload) : payload; + debug('received from (' + this.name + '): ' + message.body); + + this.emit(Link.MessageReceived, message); this._checkCredit(); }; diff --git a/test/integration/qpid/client.test.js b/test/integration/qpid/client.test.js index d3faa3b..c810892 100644 --- a/test/integration/qpid/client.test.js +++ b/test/integration/qpid/client.test.js @@ -73,7 +73,7 @@ describe('Client', function() { it('should be able to create multiple receivers for same link', function(done) { var receviedCount = 0; - var messageHandler = function(err, message) { + var messageHandler = function(message) { expect(message.body).to.equal('TESTMESSAGE'); receviedCount++; if (receviedCount === 2) done(); @@ -82,12 +82,14 @@ describe('Client', function() { test.client.connect(config.address) .then(function() { return Promise.all([ - test.client.createReceiver(config.defaultLink, null, messageHandler), - test.client.createReceiver(config.defaultLink, null, messageHandler), + test.client.createReceiver(config.defaultLink), + test.client.createReceiver(config.defaultLink), test.client.createSender(config.defaultLink) ]); }) .spread(function(receiver1, receiver2, senderLink) { + receiver1.on('message', messageHandler); + receiver2.on('message', messageHandler); return senderLink.send('TESTMESSAGE'); }); }); diff --git a/test/integration/servicebus/queue.test.js b/test/integration/servicebus/queue.test.js index c9b3b0e..052bdc7 100644 --- a/test/integration/servicebus/queue.test.js +++ b/test/integration/servicebus/queue.test.js @@ -75,7 +75,7 @@ describe('ServiceBus', function() { it('should be able to create multiple receivers for same link', function(done) { var receviedCount = 0; - var messageHandler = function(err, message) { + var messageHandler = function(message) { expect(message.body).to.equal('TESTMESSAGE'); receviedCount++; if (receviedCount === 2) done(); @@ -84,12 +84,14 @@ describe('ServiceBus', function() { test.client.connect(config.address) .then(function() { return Promise.all([ - test.client.createReceiver(config.defaultLink, null, messageHandler), - test.client.createReceiver(config.defaultLink, null, messageHandler), + test.client.createReceiver(config.defaultLink), + test.client.createReceiver(config.defaultLink), test.client.createSender(config.defaultLink) ]); }) .spread(function(receiver1, receiver2, senderLink) { + receiver1.on('message', messageHandler); + receiver2.on('message', messageHandler); return senderLink.send('TESTMESSAGE'); }); }); diff --git a/test/unit/test_amqpclient.js b/test/unit/test_amqpclient.js index 8896e83..da68b06 100644 --- a/test/unit/test_amqpclient.js +++ b/test/unit/test_amqpclient.js @@ -187,16 +187,16 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { // create but don't wait so we can simulate an attaching link - client.createReceiver(queue, function(err, payload, annotations) {}) + client.createReceiver(queue) .then(function(link) { originalLink = link; }); }) .then(function() { return Promise.all([ - client.createReceiver(queue, function(err, payload, annotations) {}), - client.createReceiver(queue, function(err, payload, annotations) {}), - client.createReceiver(queue, function(err, payload, annotations) {}) + client.createReceiver(queue), + client.createReceiver(queue), + client.createReceiver(queue) ]); }) .spread(function(link1, link2, link3) { @@ -244,7 +244,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function(err, payload, annotations) {}); + return client.createReceiver(queue); }) .then(function() { expect(c._created).to.eql(1); @@ -298,8 +298,8 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { return Promise.all([ - client.createReceiver('queue1', function(err, payload, annotations) {}), - client.createReceiver('queue2', function(err, payload, annotations) {}) + client.createReceiver('queue1'), + client.createReceiver('queue2') ]); }) .then(function() { @@ -354,7 +354,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function() {}); + return client.createReceiver(queue); }) .then(function() { process.nextTick(function() { @@ -409,7 +409,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function() {}); + return client.createReceiver(queue); }) .then(function() { process.nextTick(function() {