Skip to content

Commit

Permalink
feat(createSender): add createSender to the Client class
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Jun 26, 2015
1 parent 7fd6ab3 commit abde6af
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 17 deletions.
124 changes: 108 additions & 16 deletions lib/amqp_client.js
Expand Up @@ -75,6 +75,7 @@ function AMQPClient(policy) {
this._session = null;
this._sendMsgId = 1;
this._attaching = {};
this._senderAttaching = {};
this._attached = {};
this._onReceipt = {};
this._pendingSends = {};
Expand Down Expand Up @@ -219,6 +220,106 @@ AMQPClient.prototype.connect = function(url) {
});
};

AMQPClient.prototype._resolveDeferredSenderAttaches = function(linkName, err, link) {
// resolve deferred attachments
while (this._senderAttaching[linkName].length) {
var deferredAttach = this._senderAttaching[linkName].shift();
deferredAttach(err, link);
}

this._senderAttaching[linkName] = undefined;
};

/**
* Creates a receiver link for the given address, with optional link options
*
* @method createSender
* @param {string} [address] An address to connect this link to
* @param {*} [options] Options used for creating the link
*
* @return {Promise}
*/
AMQPClient.prototype.createSender = function(address, options) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}

var self = this;
return new Promise(function(resolve, reject) {
var linkName = address + '_TX';

// return cached link if it exists
if (self._attached[linkName]) {
resolve(self._attached[linkName]);
return;
}

if (self._senderAttaching[linkName]) {
var deferredAttach = function(err, link) {
if (!!err) reject(err);
resolve(link);
};

self._senderAttaching[linkName].push(deferredAttach);
return;
}

// otherwise set some initial state for the link.
if (self._attached[linkName] === undefined) self._attached[linkName] = null;
if (self._senderAttaching[linkName] === undefined) self._senderAttaching[linkName] = [];

var attach = function() {
var onAttached = function(link) {
if (link.name !== linkName) {
return;
}

debug('sender link attached: ' + linkName);
self.removeListener(AMQPClient.LinkAttached, onAttached);
self._attached[linkName] = link;

link.on(Link.ErrorReceived, function(err) {
self.emit(AMQPClient.ErrorReceived, err);

this._resolveDeferredSenderAttaches(linkName, err);
reject(err);
});

link.on(Link.Detached, function(details) {
debug('sender link detached: ' + (details ? details.error : 'No details'));
self._attached[linkName] = undefined;

this._resolveDeferredSenderAttaches(linkName, details);
reject(details);
});

// return the attached link
this._resolveDeferredSenderAttaches(linkName, null, link);
resolve(link);
};
self.on(AMQPClient.LinkAttached, onAttached);

var onMapped = function() {
self.removeListener(AMQPClient.SessionMapped, onMapped);
var linkPolicy = u.deepMerge({
options: {
name: linkName,
source: { address: 'localhost' },
target: { address: address }
}
}, self.policy.senderLink);
self._session.attachLink(linkPolicy);
};
(self._session) ? onMapped() : self.on(AMQPClient.SessionMapped, onMapped);

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Jun 26, 2015

Owner

I like this.

};

self._reattach[linkName] = attach;

// attempt to attach the link
attach();
});
};


/**
* Sends the given message, with the given options, to the given target.
Expand Down Expand Up @@ -287,7 +388,8 @@ AMQPClient.prototype.send = function(msg, target, options) {
};

if (self._attaching[linkName]) {
// We're connecting, but our link isn't yet attached. Add ourselves to the list for calling when attached.
// We're connecting, but our link isn't yet attached. Add ourselves to
// the list for calling when attached.
self._pendingSends[linkName].push(sender);
return;
}
Expand Down Expand Up @@ -332,7 +434,9 @@ AMQPClient.prototype.send = function(msg, target, options) {
}
};
self.on(AMQPClient.LinkAttached, onAttached);
if (self._session) {

var onMapped = function() {
self.removeListener(AMQPClient.SessionMapped, onMapped);
var linkPolicy = u.deepMerge({
options: {
name: linkName,
Expand All @@ -341,20 +445,8 @@ AMQPClient.prototype.send = function(msg, target, options) {
}
}, self.policy.senderLink);
self._session.attachLink(linkPolicy);
} else {
var onMapped = function() {
self.removeListener(AMQPClient.SessionMapped, onMapped);
var linkPolicy = u.deepMerge({
options: {
name: linkName,
source: {address: 'localhost'},
target: {address: target}
}
}, self.policy.senderLink);
self._session.attachLink(linkPolicy);
};
self.on(AMQPClient.SessionMapped, onMapped);
}
};
(self._session) ? onMapped() : self.on(AMQPClient.SessionMapped, onMapped);
};

self._reattach[linkName] = attach;
Expand Down
117 changes: 116 additions & 1 deletion lib/sender_link.js
Expand Up @@ -7,13 +7,20 @@ var _ = require('lodash'),

errors = require('./errors'),
constants = require('./constants'),
putils = require('./policies/policy_utilities'),

TransferFrame = require('./frames/transfer_frame'),

DeliveryState = require('./types/delivery_state'),
M = require('./types/message'),
Link = require('./link');

function SenderLink(session, handle, linkPolicy) {
SenderLink.super_.call(this, session, handle, linkPolicy);

this._messageId = 1;
this._pendingSends = [];
this._unsettledSends = {};
}
util.inherits(SenderLink, Link);

Expand All @@ -26,13 +33,76 @@ SenderLink.prototype.attach = function() {
SenderLink.super_.prototype.attach.call(this);
};


SenderLink.prototype.canSend = function() {
var sendable = (this.linkCredit >= 1 && (!this.session.policy.enableSessionFlowControl || this.session._sessionParams.remoteIncomingWindow >= 1));
debug('canSend(' + this.linkCredit + ',' + this.session._sessionParams.remoteIncomingWindow + ') = ' + sendable);
return sendable;
};

/**
* Sends the given message, with the given options on this link
*
* @method send
* @param {*} msg Message to send. Will be encoded using sender link policy's encoder.
* @param {*} [options] An object of options to attach to the message including: annotations, properties,
and application properties
* @param options.annotations Annotations for the message, if any. See AMQP spec for details, and server for specific
* annotations that might be relevant (e.g. x-opt-partition-key on EventHub). If node-amqp-encoder'd
* map is given, it will be translated to appropriate internal types. Simple maps will be converted
* to AMQP Fields type as defined in the spec.
*
* @return {Promise}
*/
SenderLink.prototype.send = function(msg, options) {
if (!this.session.connection) {
throw new Error('Must connect before sending');
}

var self = this;
return new Promise(function(resolve, reject) {
var message = new M.Message(options, self.policy.encoder(msg));
var sendMessageId = self._messageId++;

var sendMessage = function(err) {
if (err) {
reject(err);
} else {
debug('sending: ', msg);

// @todo: deliveryTag is NO LONGER UNIQUE, figure out better way to track
// message ids
var messageId = self._sendMessage(message, {
deliveryTag: new Buffer(sendMessageId.toString())
});

var cbPolicy = self.policy.callback;
if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) {
var deferredSender = function(err, state) {
if (!!err) {
reject(err);
} else {
resolve(state);
}
};
self._unsettledSends[messageId] = deferredSender;
} else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) {
resolve();
} else {
reject(new errors.ArgumentError('Invalid sender callback policy: ' + cbPolicy));
}
}
};

if (!self.attached || !self.canSend()) {
self._pendingSends.push(sendMessage);
return;
}

// otherwise send the message
sendMessage(null);
});
};

// private API
SenderLink.prototype._sendMessage = function(message, options) {
// preconditions
Expand Down Expand Up @@ -87,6 +157,51 @@ SenderLink.prototype._flowReceived = function(flowFrame) {
}

this.emit(Link.CreditChange, this);
this._dispatchPendingSends(null);
};

SenderLink.prototype._attachReceived = function(attachFrame) {
SenderLink.super_.prototype._attachReceived.call(this, attachFrame);

this._dispatchPendingSends(null);
};

SenderLink.prototype._detached = function(frame) {
SenderLink.super_.prototype._detached.call(this, frame);

// has an error occurred?
if (frame && frame.error) {
this._dispatchPendingSends(frame.error);
}
};

SenderLink.prototype._dispatchPendingSends = function(err) {
while (this._pendingSends && this._pendingSends.length > 0 && this.canSend()) {
var sendMessage = this._pendingSends.shift();
sendMessage(err);
}
};

SenderLink.prototype._dispositionReceived = function(details) {
if (!details.settled) {
return;
}

var err = null;
if (details.state instanceof DeliveryState.Rejected) {
err = details.state.error;
}

var first = details.first;
var last = details.last || first;
for (var messageId = first; messageId <= last; ++messageId) {
if (!this._unsettledSends[messageId]) {
continue;
}

this._unsettledSends[messageId](err, details.state);
this._unsettledSends[messageId] = undefined;
}
};

module.exports = SenderLink;

0 comments on commit abde6af

Please sign in to comment.