diff --git a/examples/servicebus_reconnect.js b/examples/servicebus_reconnect.js index c34482e..8974bfa 100644 --- a/examples/servicebus_reconnect.js +++ b/examples/servicebus_reconnect.js @@ -1,8 +1,19 @@ +/// +/// A simple example showing ServiceBus' disconnect behavior: +/// - It seems to drop unused send links at around 10 minutes, despite heartbeat frames. +/// - It then drops the connection entirely at around 20 minutes. +/// +/// The Link has logic to deal with re-attaching for this detach at 10 minutes, and +/// the connection has logic to deal with re-connecting and re-establishing links. +/// +/// The code below demonstrates both. +/// 'use strict'; //var AMQPClient = require('amqp10').Client; var AMQPClient = require('../lib').Client, - Policy = require('../lib').Policy; + Policy = require('../lib').Policy, + Promise = require('bluebird'); // Simple argument-checker, you can ignore. function argCheck(settings, options) { @@ -17,44 +28,99 @@ function argCheck(settings, options) { if (process.argv.length < 3) { console.warn('Usage: node ' + process.argv[1] + ' '); -} else { - var settingsFile = process.argv[2]; - var settings = require('./' + settingsFile); - argCheck(settings, ['serviceBusHost', 'SASKeyName', 'SASKey', 'queueName']); - var protocol = settings.protocol || 'amqps'; - var serviceBusHost = settings.serviceBusHost + '.servicebus.windows.net'; - if (settings.serviceBusHost.indexOf(".") !== -1) { - serviceBusHost = settings.serviceBusHost; - } - var sasName = settings.SASKeyName; - var sasKey = settings.SASKey; - var queueName = settings.queueName; - - var uri = protocol + '://' + encodeURIComponent(sasName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost; - - var msgVal = 1; - var delayBetweenSending = 1000 * 60; - var iteration = 0; - var numIters = 1; - var client = new AMQPClient(Policy.ServiceBusQueue); - client.connect(uri).then(function () { - client.createSender(queueName).then(function (sender) { - setInterval(function () { - iteration++; - if (iteration === numIters) { - numIters *= 2; - iteration = 0; - console.log('===> Sending message ' + msgVal + ', ' + numIters + ' mins to next send.'); - sender.send({"DataString": "From Node", "DataValue": msgVal++}).then(function (state) { - console.log('State: ', state); - }).catch(function (eSend) { - console.warn('Failed to send: ' + eSend); - }); - } - }, delayBetweenSending); + process.exit(1); +} + +var settingsFile = process.argv[2]; +var settings = require('./' + settingsFile); +argCheck(settings, ['serviceBusHost', 'SASKeyName', 'SASKey', 'queueName']); +var protocol = settings.protocol || 'amqps'; +var serviceBusHost = settings.serviceBusHost + '.servicebus.windows.net'; +if (settings.serviceBusHost.indexOf(".") !== -1) { + serviceBusHost = settings.serviceBusHost; +} +var sasName = settings.SASKeyName; +var sasKey = settings.SASKey; +var queueName = settings.queueName; + +var uri = protocol + '://' + encodeURIComponent(sasName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost; + +function sendReceiveAndQuit(policy, minutesUntilSend) { + var msgVal = Math.floor(Math.random() * 1000000); + var min = 60 * 1000; + var remaining = minutesUntilSend; + return new Promise(function (resolve, reject) { + var client = new AMQPClient(policy); + client.on('client:errorReceived', function (err) { + console.warn('===> CLIENT ERROR: ', err); + }); + client.on('connection:closed', function() { + console.warn('===> CONNECTION CLOSED'); }); - }).catch(function (e) { - console.warn('Error send/receive: ' + e); + client.connect(uri).then(function() { + return client.createSender(queueName); + }) + .then(function (sender) { + sender.on('errorReceived', function(tx_err) { + console.warn('===> TX ERROR: ', tx_err); + }); + sender.on('detached', function() { + console.warn('===> TX DETACHED'); + }); + sender.session.on('unmapped', function() { + console.warn('===> SESSION UNMAPPED'); + }); + sender.session.on('errorReceived', function(err) { + console.warn('===> SESSION ERROR: ', err); + }); + setTimeout(function() { + client.disconnect().then(function() { + reject('Message not seen in ' + (minutesUntilSend + 1) + ' minutes'); + }); + }, (minutesUntilSend + 1) * min); + var interval = setInterval(function() { + remaining--; + if (remaining === 0) { + clearInterval(interval); + // Note that if you create the receiver at the same time as the sender, + // it doesn't seem to trigger the behavior (i.e. an active receiver link stops the auto-detach). + client.createReceiver(queueName).then(function (receiver) { + receiver.on('errorReceived', function (rx_err) { + console.warn('===> RX ERROR: ', rx_err); + }); + receiver.on('detached', function () { + console.warn('===> RX DETACHED'); + }); + receiver.on('message', function (msg) { + if (msg.body && msg.body.DataValue && msg.body.DataValue === msgVal) { + client.disconnect().then(function () { + console.log("\n=================\nExpected Value Seen\n==================\n"); + resolve(); + }); + } + }); + sender.send({"DataString": "From Node", "DataValue": msgVal}).then(function (state) { + console.log('State: ', state); + }).catch(function (eSend) { + console.warn('===> TX SEND FAILURE: ', eSend); + }); + }); + } else { + console.log('===> ' + remaining + ' minutes remaining until sending.'); + } + }, min); + }) }); } +var noReattach = Policy.merge({ senderLink: { reattach: null }, receiverLink: { reattach: null } }, Policy.ServiceBusQueue); +var reattach = Policy.ServiceBusQueue; + +var minutesUntilServiceBusDropsLink = 10; +var minutesUntilServiceBusDropsConnection = 20; +sendReceiveAndQuit(reattach, minutesUntilServiceBusDropsLink+ 1).then(function() { + sendReceiveAndQuit(noReattach, minutesUntilServiceBusDropsConnection + 1).then(function() { + console.log("\n==================\nComplete\n=================="); + }); +}); + diff --git a/lib/amqp_client.js b/lib/amqp_client.js index 1e2ab7e..2ffca19 100644 --- a/lib/amqp_client.js +++ b/lib/amqp_client.js @@ -74,7 +74,6 @@ function AMQPClient(policy) { this._connection = null; this._session = null; - this._reattach = {}; this._reconnect = null; if (!!this.policy.reconnect) { this._timeouts = u.generateTimeouts(this.policy.reconnect); @@ -133,21 +132,18 @@ AMQPClient.prototype.connect = function(url) { self._connection.on(Connection.Connected, function(c) { debug('connected'); self.emit(AMQPClient.ConnectionOpened); - self._session = self._newSession(c); + if (self._session) { + debug('session already exists, re-using'); + self._session.connection = self._connection; + } else { + self._session = self._newSession(c); + } + self._session.on(Session.Mapped, function(s) { debug('mapped'); resolve(self); }); - self._session.on(Session.Unmapped, function(s) { - debug('unmapped'); - }); - - self._session.on(Session.ErrorReceived, function(e) { - debug('session error: ', e); - self.emit(AMQPClient.ErrorReceived, e); - }); - self._session.begin(self.policy.session); }); @@ -159,8 +155,7 @@ AMQPClient.prototype.connect = function(url) { return reject(new errors.DisconnectedError()); } - if (!self._timeouts.length) - self._timeouts = u.generateTimeouts(self.policy.reconnect); + if (!self._timeouts.length) self._timeouts = u.generateTimeouts(self.policy.reconnect); setTimeout(function() { // @todo: see _attemptReconnection below for more detail, but bail here @@ -169,11 +164,6 @@ AMQPClient.prototype.connect = function(url) { }, self._timeouts.shift()); }); - self._connection.on(Connection.ErrorReceived, function(e) { - debug('connection error: ', e); - self.emit(AMQPClient.ErrorReceived, e); - }); - self._connection.open(address, sasl); }); }; @@ -220,7 +210,6 @@ AMQPClient.prototype.createSender = function(address, options) { link._onAttach.push(attachPromise); }; - self._reattach[linkName] = attach; attach(); }); }; @@ -293,7 +282,6 @@ AMQPClient.prototype.createReceiver = function(address, options) { link._onAttach.push(attachPromise); }; - self._reattach[linkName] = attach; attach(); }); }; @@ -325,28 +313,43 @@ AMQPClient.prototype.disconnect = function() { AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) { this._connection = null; - this._session = null; // Copy from original to avoid any settings changes "sticking" across connections. this.policy = u.deepMerge(this._originalPolicy); if (!saveReconnectDetails) { - this._reattach = {}; this._reconnect = null; } }; // Helper methods for mocking in tests. AMQPClient.prototype._newConnection = function() { - return new Connection(this.policy.connect); + var self = this; + var connection = new Connection(this.policy.connect); + connection.on(Connection.ErrorReceived, function(e) { + debug('connection error: ', e); + self.emit(AMQPClient.ErrorReceived, e); + }); + + return connection; }; AMQPClient.prototype._newSession = function(conn) { - return new Session(conn); + var self = this; + var session = new Session(conn); + session.on(Session.Unmapped, function(s) { + debug('unmapped'); + }); + + session.on(Session.ErrorReceived, function(e) { + debug('session error: ', e); + self.emit(AMQPClient.ErrorReceived, e); + }); + + return session; }; AMQPClient.prototype._preventReconnect = function() { - this._reattach = {}; this._reconnect = null; }; @@ -361,13 +364,6 @@ AMQPClient.prototype._attemptReconnection = function() { var self = this; return self._reconnect() - .then(function() { - debug('reconnected and remapped, attempting to re-attach links'); - Object.keys(self._reattach).forEach(function(link) { - debug('reattaching link: ' + link); - self._reattach[link](); - }); - }) .catch(function(err) { self.emit(AMQPClient.ErrorReceived, err); diff --git a/lib/link.js b/lib/link.js index c49ea0e..f049227 100644 --- a/lib/link.js +++ b/lib/link.js @@ -9,6 +9,7 @@ var _ = require('lodash'), debug = require('debug')('amqp10:link'), constants = require('./constants'), + u = require('./utilities'), AttachFrame = require('./frames/attach_frame'), DetachFrame = require('./frames/detach_frame'), @@ -22,6 +23,9 @@ function Link(session, handle, linkPolicy) { this.remoteHandle = undefined; this._onAttach = []; + if (this.policy && this.policy.reattach) { + this._timeouts = u.generateTimeouts(this.policy.reattach); + } var self = this; var stateMachine = { @@ -42,6 +46,9 @@ function Link(session, handle, linkPolicy) { detachReceived: function() { self._sendDetach(); return this.DETACHING; + }, + forceDetach: function() { + return this.DETACHED; } }, 'DETACHING': { @@ -55,7 +62,7 @@ function Link(session, handle, linkPolicy) { }; this.linkSM = new StateMachine(stateMachine).bind(function(event, oldState, newState) { - debug('Transitioning from ' + oldState + ' to ' + newState + ' due to ' + event); + debug(self.name + ':' + self.handle + ': Transitioning from ' + oldState + ' to ' + newState + ' due to ' + event); }); } @@ -100,6 +107,7 @@ Link.prototype.attach = function() { Link.prototype.detach = function() { var self = this; + this._timeouts = undefined; // Disable any re-attachment policy. var detachPromise = new Promise(function(resolve, reject) { var onError = function(err) { reject(err); }; self.once(Link.ErrorReceived, onError); @@ -116,6 +124,15 @@ Link.prototype.detach = function() { return detachPromise; }; +/// +/// Force link state to detached without sending detach message - usually due to forcible disconnect or unmap from above. +/// Important bit is that this should not trigger auto-reattach behavior as that'll happen with reconnect. +/// +Link.prototype.forceDetach = function() { + debug('Force-detach for ' + this.name + '. Current state: ' + this.state()); + this.linkSM.forceDetach(); +}; + // private api Link.prototype._resolveAttachPromises = function(err, link) { while (this._onAttach.length) { @@ -130,7 +147,7 @@ Link.prototype._attachReceived = function(attachFrame) { // process params. this.remoteHandle = attachFrame.handle; this.session._linksByRemoteHandle[this.remoteHandle] = this; - debug('attached CH=[' + this.session.channel + '=>' + attachFrame.channel + '], Handle=[' + this.handle + '=>' + attachFrame.handle + ']'); + debug(this.name + ': attached CH=[' + this.session.channel + '=>' + attachFrame.channel + '], Handle=[' + this.handle + '=>' + attachFrame.handle + ']'); this.emit(Link.Attached, this); this._resolveAttachPromises(null, this); @@ -164,10 +181,28 @@ Link.prototype._detached = function(frame) { this.remoteHandle = undefined; } - delete this.session._allocatedHandles[this.policy.options.handle]; - this.emit(Link.Detached, { closed: frame.closed, error: frame.error }); this._resolveAttachPromises(frame.error ? frame.error : 'link closed'); + + var self = this; + if (!self.shouldReattach()) return; + + if (!self._timeouts.length) self._timeouts = u.generateTimeouts(self.policy.reattach); + + setTimeout(function() { + self._attemptReattach(); + }, self._timeouts.shift()); +}; + +Link.prototype.shouldReattach = function() { + if (!this.session || !this._timeouts) return false; + if (!this._timeouts.length && !this.policy.reattach.forever) return false; + return true; +}; + +Link.prototype._attemptReattach = function() { + debug('Attempting to reattach ' + this.name); + this.attach(); }; module.exports = Link; diff --git a/lib/policies/default_policy.js b/lib/policies/default_policy.js index a0c91c1..efb44f5 100644 --- a/lib/policies/default_policy.js +++ b/lib/policies/default_policy.js @@ -77,7 +77,8 @@ module.exports = { initialDeliveryCount: 1 }, callback: putils.SenderCallbackPolicies.OnSettle, - encoder: function(body) { return body; } + encoder: function(body) { return body; }, + reattach: null }, receiverLink: { options: { @@ -88,6 +89,7 @@ module.exports = { }, credit: putils.CreditPolicies.RefreshAtHalf, creditQuantum: 100, - decoder: function(body) { return body; } + decoder: function(body) { return body; }, + reattach: null } }; diff --git a/lib/policies/service_bus_policy.js b/lib/policies/service_bus_policy.js index 66b51a9..a378268 100644 --- a/lib/policies/service_bus_policy.js +++ b/lib/policies/service_bus_policy.js @@ -20,6 +20,11 @@ module.exports = u.deepMerge({ bodyStr = JSON.stringify(body); } return new Buffer(bodyStr, 'utf8'); + }, + reattach: { + retries: 10, + strategy: 'fibonacci', // || 'exponential' + forever: true } }, receiverLink: { @@ -44,6 +49,11 @@ module.exports = u.deepMerge({ } catch (e) { return bodyStr; } + }, + reattach: { + retries: 10, + strategy: 'fibonacci', // || 'exponential' + forever: true } } }, DefaultPolicy); diff --git a/lib/session.js b/lib/session.js index 4f43f24..904bf79 100644 --- a/lib/session.js +++ b/lib/session.js @@ -221,11 +221,7 @@ Session.prototype.createLink = function(linkPolicy) { var self = this; link.on(Link.Detached, function(details) { debug('detached(' + link.name + '): ' + (details ? details.error : 'No details')); - if (policy.options.role === constants.linkRole.sender) { - delete self._senderLinks[policy.options.name]; - } else { - delete self._receiverLinks[policy.options.name]; - } + if (!link.shouldReattach()) self._removeLink(self); }); link.on(Link.ErrorReceived, function(err) { @@ -240,6 +236,20 @@ Session.prototype.createLink = function(linkPolicy) { return link; }; +/// +/// Remove a link from the sender +/// +Session.prototype._removeLink = function(link) { + delete this._allocatedHandles[link.policy.options.handle]; + if (link instanceof SenderLink) { + debug('Removing sender link ' + link.name); + delete this._senderLinks[link.name]; + } else { + debug('Removing receiver link ' + link.name); + delete this._receiverLinks[link.name]; + } +}; + Session.prototype.addWindow = function(windowSize, flowOptions) { var opts = flowOptions || {}; this._sessionParams.incomingWindow += windowSize; @@ -364,6 +374,7 @@ Session.prototype._beginReceived = function(frame) { // attach all links var attachHandler = function(l) { + debug('Attaching link ' + l.name + ':' + l.handle + ' after begin received'); if (l.state() !== 'attached' && l.state() !== 'attaching') l.attach(); }; _.values(this._senderLinks).forEach(attachHandler); @@ -419,6 +430,15 @@ Session.prototype._unmap = function() { this.remoteChannel = undefined; this.channel = undefined; this.mapped = false; + + debug('Session unmapped - force-detaching all links.'); + // force-detach all links (they've already been detached due to unmap, just need to let them know about it) + var detachHandler = function(l) { + if (l.state() === 'attached') l.forceDetach(); + }; + _.values(this._senderLinks).forEach(detachHandler); + _.values(this._receiverLinks).forEach(detachHandler); + this.emit(Session.Unmapped); } }; diff --git a/lib/utilities.js b/lib/utilities.js index f11e69e..c79778e 100644 --- a/lib/utilities.js +++ b/lib/utilities.js @@ -197,6 +197,7 @@ function deepMerge() { } module.exports.deepMerge = deepMerge; +module.exports.deepCopy = deepMerge; function coerce(val, T) { if (val === null || val === undefined) return null; diff --git a/test/unit/test_amqpclient.js b/test/unit/test_amqpclient.js index 9fee661..c85def0 100644 --- a/test/unit/test_amqpclient.js +++ b/test/unit/test_amqpclient.js @@ -130,6 +130,7 @@ describe('AMQPClient', function() { expect(l.messages[0].message).to.eql({my: 'message'}); }); }); + }); describe('#receive()', function() {