diff --git a/lib/amqp_client.js b/lib/amqp_client.js index 1f089e8..4cb609b 100644 --- a/lib/amqp_client.js +++ b/lib/amqp_client.js @@ -75,6 +75,7 @@ function AMQPClient(policy) { this._session = null; this._sendMsgId = 1; this._attaching = {}; + this._senderAttaching = {}; this._attached = {}; this._onReceipt = {}; this._pendingSends = {}; @@ -219,6 +220,106 @@ AMQPClient.prototype.connect = function(url) { }); }; +AMQPClient.prototype._resolveDeferredSenderAttaches = function(linkName, err, link) { + // resolve deferred attachments + while (this._senderAttaching[linkName].length) { + var deferredAttach = this._senderAttaching[linkName].shift(); + deferredAttach(err, link); + } + + this._senderAttaching[linkName] = undefined; +}; + +/** + * Creates a receiver link for the given address, with optional link options + * + * @method createSender + * @param {string} [address] An address to connect this link to + * @param {*} [options] Options used for creating the link + * + * @return {Promise} + */ +AMQPClient.prototype.createSender = function(address, options) { + if (!this._connection) { + throw new Error('Must connect before creating links'); + } + + var self = this; + return new Promise(function(resolve, reject) { + var linkName = address + '_TX'; + + // return cached link if it exists + if (self._attached[linkName]) { + resolve(self._attached[linkName]); + return; + } + + if (self._senderAttaching[linkName]) { + var deferredAttach = function(err, link) { + if (!!err) reject(err); + resolve(link); + }; + + self._senderAttaching[linkName].push(deferredAttach); + return; + } + + // otherwise set some initial state for the link. + if (self._attached[linkName] === undefined) self._attached[linkName] = null; + if (self._senderAttaching[linkName] === undefined) self._senderAttaching[linkName] = []; + + var attach = function() { + var onAttached = function(link) { + if (link.name !== linkName) { + return; + } + + debug('sender link attached: ' + linkName); + self.removeListener(AMQPClient.LinkAttached, onAttached); + self._attached[linkName] = link; + + link.on(Link.ErrorReceived, function(err) { + self.emit(AMQPClient.ErrorReceived, err); + + this._resolveDeferredSenderAttaches(linkName, err); + reject(err); + }); + + link.on(Link.Detached, function(details) { + debug('sender link detached: ' + (details ? details.error : 'No details')); + self._attached[linkName] = undefined; + + this._resolveDeferredSenderAttaches(linkName, details); + reject(details); + }); + + // return the attached link + this._resolveDeferredSenderAttaches(linkName, null, link); + resolve(link); + }; + self.on(AMQPClient.LinkAttached, onAttached); + + var onMapped = function() { + self.removeListener(AMQPClient.SessionMapped, onMapped); + var linkPolicy = u.deepMerge({ + options: { + name: linkName, + source: { address: 'localhost' }, + target: { address: address } + } + }, self.policy.senderLink); + self._session.attachLink(linkPolicy); + }; + (self._session) ? onMapped() : self.on(AMQPClient.SessionMapped, onMapped); + }; + + self._reattach[linkName] = attach; + + // attempt to attach the link + attach(); + }); +}; + /** * Sends the given message, with the given options, to the given target. @@ -287,7 +388,8 @@ AMQPClient.prototype.send = function(msg, target, options) { }; if (self._attaching[linkName]) { - // We're connecting, but our link isn't yet attached. Add ourselves to the list for calling when attached. + // We're connecting, but our link isn't yet attached. Add ourselves to + // the list for calling when attached. self._pendingSends[linkName].push(sender); return; } @@ -332,7 +434,9 @@ AMQPClient.prototype.send = function(msg, target, options) { } }; self.on(AMQPClient.LinkAttached, onAttached); - if (self._session) { + + var onMapped = function() { + self.removeListener(AMQPClient.SessionMapped, onMapped); var linkPolicy = u.deepMerge({ options: { name: linkName, @@ -341,20 +445,8 @@ AMQPClient.prototype.send = function(msg, target, options) { } }, self.policy.senderLink); self._session.attachLink(linkPolicy); - } else { - var onMapped = function() { - self.removeListener(AMQPClient.SessionMapped, onMapped); - var linkPolicy = u.deepMerge({ - options: { - name: linkName, - source: {address: 'localhost'}, - target: {address: target} - } - }, self.policy.senderLink); - self._session.attachLink(linkPolicy); - }; - self.on(AMQPClient.SessionMapped, onMapped); - } + }; + (self._session) ? onMapped() : self.on(AMQPClient.SessionMapped, onMapped); }; self._reattach[linkName] = attach; diff --git a/lib/sender_link.js b/lib/sender_link.js index 32d8b9a..0fc8dfc 100644 --- a/lib/sender_link.js +++ b/lib/sender_link.js @@ -7,13 +7,20 @@ var _ = require('lodash'), errors = require('./errors'), constants = require('./constants'), + putils = require('./policies/policy_utilities'), TransferFrame = require('./frames/transfer_frame'), + DeliveryState = require('./types/delivery_state'), + M = require('./types/message'), Link = require('./link'); function SenderLink(session, handle, linkPolicy) { SenderLink.super_.call(this, session, handle, linkPolicy); + + this._messageId = 1; + this._pendingSends = []; + this._unsettledSends = {}; } util.inherits(SenderLink, Link); @@ -26,13 +33,76 @@ SenderLink.prototype.attach = function() { SenderLink.super_.prototype.attach.call(this); }; - SenderLink.prototype.canSend = function() { var sendable = (this.linkCredit >= 1 && (!this.session.policy.enableSessionFlowControl || this.session._sessionParams.remoteIncomingWindow >= 1)); debug('canSend(' + this.linkCredit + ',' + this.session._sessionParams.remoteIncomingWindow + ') = ' + sendable); return sendable; }; +/** + * Sends the given message, with the given options on this link + * + * @method send + * @param {*} msg Message to send. Will be encoded using sender link policy's encoder. + * @param {*} [options] An object of options to attach to the message including: annotations, properties, + and application properties + * @param options.annotations Annotations for the message, if any. See AMQP spec for details, and server for specific + * annotations that might be relevant (e.g. x-opt-partition-key on EventHub). If node-amqp-encoder'd + * map is given, it will be translated to appropriate internal types. Simple maps will be converted + * to AMQP Fields type as defined in the spec. + * + * @return {Promise} + */ +SenderLink.prototype.send = function(msg, options) { + if (!this.session.connection) { + throw new Error('Must connect before sending'); + } + + var self = this; + return new Promise(function(resolve, reject) { + var message = new M.Message(options, self.policy.encoder(msg)); + var sendMessageId = self._messageId++; + + var sendMessage = function(err) { + if (err) { + reject(err); + } else { + debug('sending: ', msg); + + // @todo: deliveryTag is NO LONGER UNIQUE, figure out better way to track + // message ids + var messageId = self._sendMessage(message, { + deliveryTag: new Buffer(sendMessageId.toString()) + }); + + var cbPolicy = self.policy.callback; + if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) { + var deferredSender = function(err, state) { + if (!!err) { + reject(err); + } else { + resolve(state); + } + }; + self._unsettledSends[messageId] = deferredSender; + } else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) { + resolve(); + } else { + reject(new errors.ArgumentError('Invalid sender callback policy: ' + cbPolicy)); + } + } + }; + + if (!self.attached || !self.canSend()) { + self._pendingSends.push(sendMessage); + return; + } + + // otherwise send the message + sendMessage(null); + }); +}; + // private API SenderLink.prototype._sendMessage = function(message, options) { // preconditions @@ -87,6 +157,51 @@ SenderLink.prototype._flowReceived = function(flowFrame) { } this.emit(Link.CreditChange, this); + this._dispatchPendingSends(null); +}; + +SenderLink.prototype._attachReceived = function(attachFrame) { + SenderLink.super_.prototype._attachReceived.call(this, attachFrame); + + this._dispatchPendingSends(null); +}; + +SenderLink.prototype._detached = function(frame) { + SenderLink.super_.prototype._detached.call(this, frame); + + // has an error occurred? + if (frame && frame.error) { + this._dispatchPendingSends(frame.error); + } +}; + +SenderLink.prototype._dispatchPendingSends = function(err) { + while (this._pendingSends && this._pendingSends.length > 0 && this.canSend()) { + var sendMessage = this._pendingSends.shift(); + sendMessage(err); + } +}; + +SenderLink.prototype._dispositionReceived = function(details) { + if (!details.settled) { + return; + } + + var err = null; + if (details.state instanceof DeliveryState.Rejected) { + err = details.state.error; + } + + var first = details.first; + var last = details.last || first; + for (var messageId = first; messageId <= last; ++messageId) { + if (!this._unsettledSends[messageId]) { + continue; + } + + this._unsettledSends[messageId](err, details.state); + this._unsettledSends[messageId] = undefined; + } }; module.exports = SenderLink;