diff --git a/lib/amqp_client.js b/lib/amqp_client.js index e612e12..f67a989 100644 --- a/lib/amqp_client.js +++ b/lib/amqp_client.js @@ -77,7 +77,6 @@ function AMQPClient(policy) { this._attaching = {}; this._senderAttaching = {}; this._attached = {}; - this._onReceipt = {}; this._pendingSends = {}; this._unsettledSends = {}; @@ -253,6 +252,7 @@ AMQPClient.prototype.createSender = function(address, options) { return; } + // return deferred attach promise if already attaching if (self._senderAttaching[linkName]) { var deferredAttach = function(err, link) { if (!!err) { @@ -334,35 +334,18 @@ AMQPClient.prototype.createSender = function(address, options) { * @param options.filter Filter used in connecting to the source. See AMQP spec for details, and your server's documentation * for possible values. node-amqp-encoder'd maps will be translated, and simple maps will be converted * to AMQP Fields type as defined in the spec. - * @param {function} cb Callback to invoke on every receipt. Called with (error, message). * * @return {Promise} */ -AMQPClient.prototype.createReceiver = function(address, options, cb) { +AMQPClient.prototype.createReceiver = function(address, options) { if (!this._connection) { throw new Error('Must connect before receiving'); } - var self = this; - if (cb === undefined) { - if (typeof address === 'function') { - cb = address; - address = this._defaultQueue; - options = undefined; - } else if (typeof address !== 'string') { - cb = options; - options = address; - address = this._defaultQueue; - } else { - if (typeof options !== 'object') { - cb = options; - options = undefined; - } - } - } + address = address || this._defaultQueue; + options = options || {}; + var self = this; var linkName = address + '_RX'; if (this._attaching[linkName]) { - self._onReceipt[linkName].push(cb); - return new Promise(function(resolve, reject) { var attachingListener = function(link) { if (link.name === linkName) { @@ -391,7 +374,7 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { } if (options.policy) { - u.deepMerge(linkPolicy, options.policy); + linkPolicy = u.deepMerge(linkPolicy, options.policy); } } @@ -400,14 +383,9 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { return resolve(self._attached[linkName]); // otherwise create the link, and set initial state - if (self._onReceipt[linkName] === undefined) self._onReceipt[linkName] = []; if (self._attached[linkName] === undefined) self._attached[linkName] = null; if (self._attaching[linkName] === undefined) self._attaching[linkName] = false; - if (!!cb) { - self._onReceipt[linkName].push(cb); - } - var attach = function() { self._attaching[linkName] = true; @@ -419,30 +397,11 @@ AMQPClient.prototype.createReceiver = function(address, options, cb) { self._attached[linkName] = l; l.on(Link.ErrorReceived, function(err) { - var cbs = self._onReceipt[linkName]; - if (cbs && cbs.length > 0) { - cbs.forEach(function (cb) { - cb(err); - }); - } - // @todo: most likely never called, research a way to properly // resolve or reject this promise. reject(err); }); - l.on(Link.MessageReceived, function(message) { - var payload = message.body[0] || message.body; - message.body = l.policy.decoder ? l.policy.decoder(payload) : payload; - debug('received from (' + address + '): ' + message.body); - var cbs = self._onReceipt[linkName]; - if (cbs && cbs.length > 0) { - cbs.forEach(function (cb) { - cb(null, message); - }); - } - }); - l.on(Link.Detached, function(details) { debug('link detached: ' + (details ? details.error : 'No details')); self._attached[linkName] = undefined; @@ -513,7 +472,6 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) { if (!saveReconnectDetails) { this._pendingSends = {}; - this._onReceipt = {}; this._reattach = {}; this._reconnect = null; } @@ -532,7 +490,6 @@ AMQPClient.prototype._preventReconnect = function() { this._reconnect = null; this._reattach = {}; this._pendingSends = {}; - this._onReceipt = {}; }; AMQPClient.prototype._shouldReconnect = function() { diff --git a/lib/receiver_link.js b/lib/receiver_link.js index 1d31164..605f187 100644 --- a/lib/receiver_link.js +++ b/lib/receiver_link.js @@ -107,7 +107,14 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) { this.accept(transferFrame.message); } - this.emit(Link.MessageReceived, transferFrame.message); + // optionally decode message based on policy + var message = transferFrame.message, + payload = message.body[0] || message.body; + + message.body = this.policy.decoder ? this.policy.decoder(payload) : payload; + debug('received from (' + this.name + '): ' + message.body); + + this.emit(Link.MessageReceived, message); this._checkCredit(); }; diff --git a/test/integration/qpid/client.test.js b/test/integration/qpid/client.test.js index d3faa3b..c810892 100644 --- a/test/integration/qpid/client.test.js +++ b/test/integration/qpid/client.test.js @@ -73,7 +73,7 @@ describe('Client', function() { it('should be able to create multiple receivers for same link', function(done) { var receviedCount = 0; - var messageHandler = function(err, message) { + var messageHandler = function(message) { expect(message.body).to.equal('TESTMESSAGE'); receviedCount++; if (receviedCount === 2) done(); @@ -82,12 +82,14 @@ describe('Client', function() { test.client.connect(config.address) .then(function() { return Promise.all([ - test.client.createReceiver(config.defaultLink, null, messageHandler), - test.client.createReceiver(config.defaultLink, null, messageHandler), + test.client.createReceiver(config.defaultLink), + test.client.createReceiver(config.defaultLink), test.client.createSender(config.defaultLink) ]); }) .spread(function(receiver1, receiver2, senderLink) { + receiver1.on('message', messageHandler); + receiver2.on('message', messageHandler); return senderLink.send('TESTMESSAGE'); }); }); diff --git a/test/integration/servicebus/queue.test.js b/test/integration/servicebus/queue.test.js index c9b3b0e..052bdc7 100644 --- a/test/integration/servicebus/queue.test.js +++ b/test/integration/servicebus/queue.test.js @@ -75,7 +75,7 @@ describe('ServiceBus', function() { it('should be able to create multiple receivers for same link', function(done) { var receviedCount = 0; - var messageHandler = function(err, message) { + var messageHandler = function(message) { expect(message.body).to.equal('TESTMESSAGE'); receviedCount++; if (receviedCount === 2) done(); @@ -84,12 +84,14 @@ describe('ServiceBus', function() { test.client.connect(config.address) .then(function() { return Promise.all([ - test.client.createReceiver(config.defaultLink, null, messageHandler), - test.client.createReceiver(config.defaultLink, null, messageHandler), + test.client.createReceiver(config.defaultLink), + test.client.createReceiver(config.defaultLink), test.client.createSender(config.defaultLink) ]); }) .spread(function(receiver1, receiver2, senderLink) { + receiver1.on('message', messageHandler); + receiver2.on('message', messageHandler); return senderLink.send('TESTMESSAGE'); }); }); diff --git a/test/unit/test_amqpclient.js b/test/unit/test_amqpclient.js index 8896e83..da68b06 100644 --- a/test/unit/test_amqpclient.js +++ b/test/unit/test_amqpclient.js @@ -187,16 +187,16 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { // create but don't wait so we can simulate an attaching link - client.createReceiver(queue, function(err, payload, annotations) {}) + client.createReceiver(queue) .then(function(link) { originalLink = link; }); }) .then(function() { return Promise.all([ - client.createReceiver(queue, function(err, payload, annotations) {}), - client.createReceiver(queue, function(err, payload, annotations) {}), - client.createReceiver(queue, function(err, payload, annotations) {}) + client.createReceiver(queue), + client.createReceiver(queue), + client.createReceiver(queue) ]); }) .spread(function(link1, link2, link3) { @@ -244,7 +244,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function(err, payload, annotations) {}); + return client.createReceiver(queue); }) .then(function() { expect(c._created).to.eql(1); @@ -298,8 +298,8 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { return Promise.all([ - client.createReceiver('queue1', function(err, payload, annotations) {}), - client.createReceiver('queue2', function(err, payload, annotations) {}) + client.createReceiver('queue1'), + client.createReceiver('queue2') ]); }) .then(function() { @@ -354,7 +354,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function() {}); + return client.createReceiver(queue); }) .then(function() { process.nextTick(function() { @@ -409,7 +409,7 @@ describe('AMQPClient', function() { return client.connect(mock_uri) .then(function() { - return client.createReceiver(queue, function() {}); + return client.createReceiver(queue); }) .then(function() { process.nextTick(function() {