Skip to content

Commit

Permalink
Merge pull request #147 from noodlefrenzy/keep-session-links
Browse files Browse the repository at this point in the history
Keep session links
  • Loading branch information
noodlefrenzy committed Aug 7, 2015
2 parents 9b6a47e + f413b72 commit c4e3e8c
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 81 deletions.
142 changes: 104 additions & 38 deletions examples/servicebus_reconnect.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
///
/// A simple example showing ServiceBus' disconnect behavior:
/// - It seems to drop unused send links at around 10 minutes, despite heartbeat frames.
/// - It then drops the connection entirely at around 20 minutes.
///
/// The Link has logic to deal with re-attaching for this detach at 10 minutes, and
/// the connection has logic to deal with re-connecting and re-establishing links.
///
/// The code below demonstrates both.
///

'use strict';
//var AMQPClient = require('amqp10').Client;
var AMQPClient = require('../lib').Client,
Policy = require('../lib').Policy;
Policy = require('../lib').Policy,
Promise = require('bluebird');

// Simple argument-checker, you can ignore.
function argCheck(settings, options) {
Expand All @@ -17,44 +28,99 @@ function argCheck(settings, options) {

if (process.argv.length < 3) {
console.warn('Usage: node ' + process.argv[1] + ' <settings json file>');
} else {
var settingsFile = process.argv[2];
var settings = require('./' + settingsFile);
argCheck(settings, ['serviceBusHost', 'SASKeyName', 'SASKey', 'queueName']);
var protocol = settings.protocol || 'amqps';
var serviceBusHost = settings.serviceBusHost + '.servicebus.windows.net';
if (settings.serviceBusHost.indexOf(".") !== -1) {
serviceBusHost = settings.serviceBusHost;
}
var sasName = settings.SASKeyName;
var sasKey = settings.SASKey;
var queueName = settings.queueName;

var uri = protocol + '://' + encodeURIComponent(sasName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;

var msgVal = 1;
var delayBetweenSending = 1000 * 60;
var iteration = 0;
var numIters = 1;
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri).then(function () {
client.createSender(queueName).then(function (sender) {
setInterval(function () {
iteration++;
if (iteration === numIters) {
numIters *= 2;
iteration = 0;
console.log('===> Sending message ' + msgVal + ', ' + numIters + ' mins to next send.');
sender.send({"DataString": "From Node", "DataValue": msgVal++}).then(function (state) {
console.log('State: ', state);
}).catch(function (eSend) {
console.warn('Failed to send: ' + eSend);
});
}
}, delayBetweenSending);
process.exit(1);
}

var settingsFile = process.argv[2];
var settings = require('./' + settingsFile);
argCheck(settings, ['serviceBusHost', 'SASKeyName', 'SASKey', 'queueName']);
var protocol = settings.protocol || 'amqps';
var serviceBusHost = settings.serviceBusHost + '.servicebus.windows.net';
if (settings.serviceBusHost.indexOf(".") !== -1) {
serviceBusHost = settings.serviceBusHost;
}
var sasName = settings.SASKeyName;
var sasKey = settings.SASKey;
var queueName = settings.queueName;

var uri = protocol + '://' + encodeURIComponent(sasName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;

function sendReceiveAndQuit(policy, minutesUntilSend) {
var msgVal = Math.floor(Math.random() * 1000000);
var min = 60 * 1000;
var remaining = minutesUntilSend;
return new Promise(function (resolve, reject) {
var client = new AMQPClient(policy);
client.on('client:errorReceived', function (err) {
console.warn('===> CLIENT ERROR: ', err);
});
client.on('connection:closed', function() {
console.warn('===> CONNECTION CLOSED');
});
}).catch(function (e) {
console.warn('Error send/receive: ' + e);
client.connect(uri).then(function() {
return client.createSender(queueName);
})
.then(function (sender) {
sender.on('errorReceived', function(tx_err) {
console.warn('===> TX ERROR: ', tx_err);
});
sender.on('detached', function() {
console.warn('===> TX DETACHED');
});
sender.session.on('unmapped', function() {
console.warn('===> SESSION UNMAPPED');
});
sender.session.on('errorReceived', function(err) {
console.warn('===> SESSION ERROR: ', err);
});
setTimeout(function() {
client.disconnect().then(function() {
reject('Message not seen in ' + (minutesUntilSend + 1) + ' minutes');
});
}, (minutesUntilSend + 1) * min);
var interval = setInterval(function() {
remaining--;
if (remaining === 0) {
clearInterval(interval);
// Note that if you create the receiver at the same time as the sender,
// it doesn't seem to trigger the behavior (i.e. an active receiver link stops the auto-detach).
client.createReceiver(queueName).then(function (receiver) {
receiver.on('errorReceived', function (rx_err) {
console.warn('===> RX ERROR: ', rx_err);
});
receiver.on('detached', function () {
console.warn('===> RX DETACHED');
});
receiver.on('message', function (msg) {
if (msg.body && msg.body.DataValue && msg.body.DataValue === msgVal) {
client.disconnect().then(function () {
console.log("\n=================\nExpected Value Seen\n==================\n");
resolve();
});
}
});
sender.send({"DataString": "From Node", "DataValue": msgVal}).then(function (state) {
console.log('State: ', state);
}).catch(function (eSend) {
console.warn('===> TX SEND FAILURE: ', eSend);
});
});
} else {
console.log('===> ' + remaining + ' minutes remaining until sending.');
}
}, min);
})
});
}

var noReattach = Policy.merge({ senderLink: { reattach: null }, receiverLink: { reattach: null } }, Policy.ServiceBusQueue);
var reattach = Policy.ServiceBusQueue;

var minutesUntilServiceBusDropsLink = 10;
var minutesUntilServiceBusDropsConnection = 20;
sendReceiveAndQuit(reattach, minutesUntilServiceBusDropsLink+ 1).then(function() {
sendReceiveAndQuit(noReattach, minutesUntilServiceBusDropsConnection + 1).then(function() {
console.log("\n==================\nComplete\n==================");
});
});

60 changes: 28 additions & 32 deletions lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ function AMQPClient(policy) {
this._connection = null;
this._session = null;

this._reattach = {};
this._reconnect = null;
if (!!this.policy.reconnect) {
this._timeouts = u.generateTimeouts(this.policy.reconnect);
Expand Down Expand Up @@ -133,21 +132,18 @@ AMQPClient.prototype.connect = function(url) {
self._connection.on(Connection.Connected, function(c) {
debug('connected');
self.emit(AMQPClient.ConnectionOpened);
self._session = self._newSession(c);
if (self._session) {
debug('session already exists, re-using');
self._session.connection = self._connection;
} else {
self._session = self._newSession(c);
}

self._session.on(Session.Mapped, function(s) {
debug('mapped');
resolve(self);
});

self._session.on(Session.Unmapped, function(s) {
debug('unmapped');
});

self._session.on(Session.ErrorReceived, function(e) {
debug('session error: ', e);
self.emit(AMQPClient.ErrorReceived, e);
});

self._session.begin(self.policy.session);
});

Expand All @@ -159,8 +155,7 @@ AMQPClient.prototype.connect = function(url) {
return reject(new errors.DisconnectedError());
}

if (!self._timeouts.length)
self._timeouts = u.generateTimeouts(self.policy.reconnect);
if (!self._timeouts.length) self._timeouts = u.generateTimeouts(self.policy.reconnect);

setTimeout(function() {
// @todo: see _attemptReconnection below for more detail, but bail here
Expand All @@ -169,11 +164,6 @@ AMQPClient.prototype.connect = function(url) {
}, self._timeouts.shift());
});

self._connection.on(Connection.ErrorReceived, function(e) {
debug('connection error: ', e);
self.emit(AMQPClient.ErrorReceived, e);
});

self._connection.open(address, sasl);
});
};
Expand Down Expand Up @@ -220,7 +210,6 @@ AMQPClient.prototype.createSender = function(address, options) {
link._onAttach.push(attachPromise);
};

self._reattach[linkName] = attach;
attach();
});
};
Expand Down Expand Up @@ -293,7 +282,6 @@ AMQPClient.prototype.createReceiver = function(address, options) {
link._onAttach.push(attachPromise);
};

self._reattach[linkName] = attach;
attach();
});
};
Expand Down Expand Up @@ -325,28 +313,43 @@ AMQPClient.prototype.disconnect = function() {

AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
this._connection = null;
this._session = null;

// Copy from original to avoid any settings changes "sticking" across connections.
this.policy = u.deepMerge(this._originalPolicy);

if (!saveReconnectDetails) {
this._reattach = {};
this._reconnect = null;
}
};

// Helper methods for mocking in tests.
AMQPClient.prototype._newConnection = function() {
return new Connection(this.policy.connect);
var self = this;
var connection = new Connection(this.policy.connect);
connection.on(Connection.ErrorReceived, function(e) {
debug('connection error: ', e);
self.emit(AMQPClient.ErrorReceived, e);
});

return connection;
};

AMQPClient.prototype._newSession = function(conn) {
return new Session(conn);
var self = this;
var session = new Session(conn);
session.on(Session.Unmapped, function(s) {
debug('unmapped');
});

session.on(Session.ErrorReceived, function(e) {
debug('session error: ', e);
self.emit(AMQPClient.ErrorReceived, e);
});

return session;
};

AMQPClient.prototype._preventReconnect = function() {
this._reattach = {};
this._reconnect = null;
};

Expand All @@ -361,13 +364,6 @@ AMQPClient.prototype._attemptReconnection = function() {

var self = this;
return self._reconnect()
.then(function() {
debug('reconnected and remapped, attempting to re-attach links');
Object.keys(self._reattach).forEach(function(link) {
debug('reattaching link: ' + link);
self._reattach[link]();
});
})
.catch(function(err) {
self.emit(AMQPClient.ErrorReceived, err);

Expand Down
43 changes: 39 additions & 4 deletions lib/link.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var _ = require('lodash'),

debug = require('debug')('amqp10:link'),
constants = require('./constants'),
u = require('./utilities'),

AttachFrame = require('./frames/attach_frame'),
DetachFrame = require('./frames/detach_frame'),
Expand All @@ -22,6 +23,9 @@ function Link(session, handle, linkPolicy) {
this.remoteHandle = undefined;

this._onAttach = [];
if (this.policy && this.policy.reattach) {
this._timeouts = u.generateTimeouts(this.policy.reattach);
}

var self = this;
var stateMachine = {
Expand All @@ -42,6 +46,9 @@ function Link(session, handle, linkPolicy) {
detachReceived: function() {
self._sendDetach();
return this.DETACHING;
},
forceDetach: function() {
return this.DETACHED;
}
},
'DETACHING': {
Expand All @@ -55,7 +62,7 @@ function Link(session, handle, linkPolicy) {
};

this.linkSM = new StateMachine(stateMachine).bind(function(event, oldState, newState) {
debug('Transitioning from ' + oldState + ' to ' + newState + ' due to ' + event);
debug(self.name + ':' + self.handle + ': Transitioning from ' + oldState + ' to ' + newState + ' due to ' + event);
});
}

Expand Down Expand Up @@ -100,6 +107,7 @@ Link.prototype.attach = function() {

Link.prototype.detach = function() {
var self = this;
this._timeouts = undefined; // Disable any re-attachment policy.
var detachPromise = new Promise(function(resolve, reject) {
var onError = function(err) { reject(err); };
self.once(Link.ErrorReceived, onError);
Expand All @@ -116,6 +124,15 @@ Link.prototype.detach = function() {
return detachPromise;
};

///
/// Force link state to detached without sending detach message - usually due to forcible disconnect or unmap from above.
/// Important bit is that this should not trigger auto-reattach behavior as that'll happen with reconnect.
///
Link.prototype.forceDetach = function() {
debug('Force-detach for ' + this.name + '. Current state: ' + this.state());
this.linkSM.forceDetach();
};

// private api
Link.prototype._resolveAttachPromises = function(err, link) {
while (this._onAttach.length) {
Expand All @@ -130,7 +147,7 @@ Link.prototype._attachReceived = function(attachFrame) {
// process params.
this.remoteHandle = attachFrame.handle;
this.session._linksByRemoteHandle[this.remoteHandle] = this;
debug('attached CH=[' + this.session.channel + '=>' + attachFrame.channel + '], Handle=[' + this.handle + '=>' + attachFrame.handle + ']');
debug(this.name + ': attached CH=[' + this.session.channel + '=>' + attachFrame.channel + '], Handle=[' + this.handle + '=>' + attachFrame.handle + ']');

this.emit(Link.Attached, this);
this._resolveAttachPromises(null, this);
Expand Down Expand Up @@ -164,10 +181,28 @@ Link.prototype._detached = function(frame) {
this.remoteHandle = undefined;
}

delete this.session._allocatedHandles[this.policy.options.handle];

this.emit(Link.Detached, { closed: frame.closed, error: frame.error });
this._resolveAttachPromises(frame.error ? frame.error : 'link closed');

var self = this;
if (!self.shouldReattach()) return;

if (!self._timeouts.length) self._timeouts = u.generateTimeouts(self.policy.reattach);

setTimeout(function() {
self._attemptReattach();
}, self._timeouts.shift());
};

Link.prototype.shouldReattach = function() {
if (!this.session || !this._timeouts) return false;
if (!this._timeouts.length && !this.policy.reattach.forever) return false;
return true;
};

Link.prototype._attemptReattach = function() {
debug('Attempting to reattach ' + this.name);
this.attach();
};

module.exports = Link;

0 comments on commit c4e3e8c

Please sign in to comment.