Skip to content

Commit

Permalink
feat(session): add support for multiple sessions per client
Browse files Browse the repository at this point in the history
  • Loading branch information
princjef committed Sep 19, 2017
1 parent fc84946 commit 5edee8f
Show file tree
Hide file tree
Showing 11 changed files with 737 additions and 127 deletions.
16 changes: 6 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ branches:
except:
- gh-pages

addons:
apt:
sources:
- sourceline: 'ppa:qpid/testing'
packages:
- qpidd
- qpid-tools

before_install:
- npm install -g codeclimate-test-reporter
- sudo sh -c 'printf "start on (started networking and filesystem)\nstop on runlevel [!2345]\n\nexpect fork\npre-start script\n\tmkdir -p /var/lib/qpidd/\nend script\n\nscript\n\texec qpidd -d --config /etc/qpid/qpidd.conf\nend script\n" > /etc/init/qpidd.conf'
- sudo add-apt-repository -y ppa:qpid/testing
- sudo apt-get update -qq
- sudo apt-get install -y qpidd qpid-tools

before_script:
- sudo sh -c 'echo "auth=no" >> /etc/qpid/qpidd.conf'
- sudo /etc/init.d/qpidd restart
- sudo service qpidd restart

# remove these when we can specify link properties
- sudo qpid-config add queue test.disposition.queue
Expand All @@ -34,4 +30,4 @@ script:
- make coverage

after_script:
- codeclimate-test-reporter < coverage/lcov.info
- codeclimate-test-reporter < coverage/lcov.info
178 changes: 142 additions & 36 deletions lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ function AMQPClient(policy, policyOverrides) {

this._connection = null;
this._session = null;
this._sessionPromise = null;
this._userSessions = [];

this._reconnect = null;
if (!!this.policy.reconnect) {
Expand Down Expand Up @@ -157,20 +159,47 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
self._connection.on(Connection.Connected, function(c) {
debug('connected');
self.emit(AMQPClient.ConnectionOpened);

var promises = [];

// Only map the session if it has already been created
if (self._session) {
debug('session already exists, re-using');
self._session.connection = self._connection;
} else {
self._session = self._newSession(c);

promises.push(new Promise(function(resolve, reject) {
self._session.once(Session.Mapped, function(s) {
debug('client session mapped');
resolve();
});

self._session.begin(self.policy.session);
}));
}

self._session.once(Session.Mapped, function(s) {
debug('mapped');
// Update any user-generated sessions
promises = promises.concat(self._userSessions.map(function(session) {
session.connection = self._connection;

return new Promise(function(resolve, reject) {
session.once(Session.Mapped, function(s) {
debug('user session mapped');
resolve();
});

session.begin(self.policy.session);
});
}));

if (promises.length > 0) {
Promise.all(promises).then(function() {
self.emit('connected');
resolve(self);
}).catch(reject);
} else {
self.emit('connected');
resolve(self);
});

self._session.begin(self.policy.session);
}
});

self._connection.on(Connection.Disconnected, function() {
Expand All @@ -194,6 +223,33 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
});
};

/**
* Creates a session for the current connection that can be associated with any
* new links on that connection
*
* @inner @memberof AMQPClient
*
* @return {Promise<Session>}
*/
AMQPClient.prototype.createSession = function() {
if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

var session = this._newSession(this._connection);
this._userSessions.push(session);

var self = this;
return new Promise(function (resolve, reject) {
session.once(Session.Mapped, function() {
debug('user session mapped');
resolve(session);
});

session.begin(self.policy.session);
});
};

/**
* Creates a sender link for the given address, with optional link policy
*
Expand All @@ -204,7 +260,7 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
*
* @return {Promise<SenderLink>}
*/
AMQPClient.prototype.createSender = function(address, policyOverrides) {
AMQPClient.prototype.createSender = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand All @@ -229,19 +285,20 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
linkPolicy.defaultSubject = address.subject;
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -254,8 +311,8 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
*
* @return {Promise<SenderStream>}
*/
AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
return this.createSender(address, policyOverrides)
AMQPClient.prototype.createSenderStream = function(address, policyOverrides, session) {
return this.createSender(address, policyOverrides, session)
.then(function(link) { return new SenderStream(link, policyOverrides); });
};

Expand All @@ -271,7 +328,7 @@ AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
*
* @return {Promise<ReceiverLink>}
*/
AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
AMQPClient.prototype.createReceiver = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand Down Expand Up @@ -304,19 +361,20 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
translator(['described', ['symbol', filterSymbol], ['string', address.subject]]);
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -329,14 +387,14 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
*
* @return {Promise<ReceiverStream>}
*/
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides) {
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides, session) {
// Override default credit behavior, as the stream will handle flow. The
// creditQuantum will be used as the stream's highWatermark by default.
policyOverrides = u.deepMerge({
credit: function() {},
}, policyOverrides || {});

return this.createReceiver(address, policyOverrides)
return this.createReceiver(address, policyOverrides, session)
.then(function(link) { return new ReceiverStream(link); });
};

Expand Down Expand Up @@ -369,6 +427,46 @@ AMQPClient.prototype.disconnect = function() {
});
};

// Use the provided session, falling back to the client's session and creating
// it if it doesn't already exist
AMQPClient.prototype._getSession = function(session) {
// Just return the provided session if one was passed in
if (session) {
return Promise.resolve(session);
}

var self = this;
if (this._session) {
if (this._session.mapped) {
// If we have a client session and it's already mapped, return it
return Promise.resolve(this._session);
} else if (this._session.mapping) {
// If we have a client and it's currently mapping, wait for it to be mapped
// and then return it
return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
resolve(self._session);
});
});
}
}

if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

this._session = this._newSession(this._connection);

return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
debug('client session mapped');
resolve(self._session);
});

self._session.begin(self.policy.session);
});
};

AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
if (!!this._connection) this._connection.removeAllListeners();
this._connection = null;
Expand All @@ -377,6 +475,9 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
}

if (this._session) this._session._resetLinkState();
this._userSessions.forEach(function (session) {
session._resetLinkState();
});
};

// Helper methods for mocking in tests.
Expand All @@ -395,7 +496,12 @@ AMQPClient.prototype._newSession = function(conn) {
var self = this;
var session = new Session(conn);
session.on(Session.Unmapped, function(s) {
debug('unmapped');
debug('session unmapped');
if (session.disposed) {
session.removeAllListeners();
this._userSessions = this._userSessions.filter(s => s !== session);
if (this._session === session) this._session = null;
}
});

session.on(Session.ErrorReceived, function(e) {
Expand Down
6 changes: 5 additions & 1 deletion lib/policies/policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ function Policy(overrides) {
* @property {function} window A function used to calculate how/when the flow control window should change
* @property {number} windowQuantum Quantum used in predefined window policies
* @property {boolean} enableSessionFlowControl Whether or not session flow control should be performed at all
* @property {boolean} endOnLinkDetach Whether or not to close the session when one of its links encounters an error
* @property {object|null} restart=null Whether the session should attempt restart when ended by the broker
*/
session: {
options: {
Expand All @@ -120,7 +122,9 @@ function Policy(overrides) {

window: putils.WindowPolicies.RefreshAtHalf,
windowQuantum: constants.session.defaultIncomingWindow,
enableSessionFlowControl: true
enableSessionFlowControl: true,
endOnLinkDetach: false,
restart: null
},

/**
Expand Down
8 changes: 8 additions & 0 deletions lib/policies/service_bus_policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ var Policy = require('./policy');

module.exports = new Policy({
defaultSubjects: false,
session: {
endOnLinkDetach: true,
restart: {
retries: 10,
strategy: 'fibonacci', // || 'exponential'
forever: true
}
},
senderLink: {
attach: {
maxMessageSize: 10000, // Arbitrary choice
Expand Down
Loading

0 comments on commit 5edee8f

Please sign in to comment.