Skip to content

Commit

Permalink
Merge pull request #332 from princjef/multi-session
Browse files Browse the repository at this point in the history
feat(session): add support for multiple sessions per client
  • Loading branch information
pierreca committed Jan 24, 2018
2 parents 82bd89a + 8068045 commit da6fe5a
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ script:
- make coverage

after_script:
- codeclimate-test-reporter < coverage/lcov.info
- codeclimate-test-reporter < coverage/lcov.info
187 changes: 151 additions & 36 deletions lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ function AMQPClient(policy, policyOverrides) {

this._connection = null;
this._session = null;
this._sessionPromise = null;
this._userSessions = [];

this._reconnect = null;
if (!!this.policy.reconnect) {
Expand Down Expand Up @@ -157,20 +159,54 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
self._connection.on(Connection.Connected, function(c) {
debug('connected');
self.emit(AMQPClient.ConnectionOpened);

var promises = [];

// Only map the session if it has already been created
if (self._session) {
debug('session already exists, re-using');
self._session.connection = self._connection;
} else {
self._session = self._newSession(c);

promises.push(new Promise(function(resolve, reject) {
self._session.once(Session.Mapped, function(s) {
debug('client session mapped');
resolve();
});

self._session.begin(pu.Merge(self.policy.session, self._session.policy));
}));
}

self._session.once(Session.Mapped, function(s) {
debug('mapped');
// Update any user-generated sessions
promises = promises.concat(self._userSessions.map(function(session) {
session.connection = self._connection;

return new Promise(function(resolve, reject) {
session.once(Session.Mapped, function(s) {
debug('user session mapped');
resolve();
});

// We specifically don't merge the policies here because the policy
// for these sessions is passed in explicitly on create of the
// session. It's very unlikely that there wouldn't be a policy for the
// session at this point, but we fall back to the session policy from
// the client just in case. That being said, the policy specified
// explicitly by the user for that session should take precendence,
// leading to the lack of a merge here.
session.begin(session.policy || self.policy.session);
});
}));

if (promises.length > 0) {
Promise.all(promises).then(function() {
self.emit('connected');
resolve(self);
}).catch(reject);
} else {
self.emit('connected');
resolve(self);
});

self._session.begin(self.policy.session);
}
});

self._connection.on(Connection.Disconnected, function() {
Expand All @@ -194,6 +230,35 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
});
};

/**
* Creates a session for the current connection that can be associated with any
* new links on that connection
*
* @inner @memberof AMQPClient
*
* @return {Promise<Session>}
*/
AMQPClient.prototype.createSession = function(policyOverrides) {
if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

policyOverrides = policyOverrides || {};

var session = this._newSession(this._connection);
this._userSessions.push(session);

var self = this;
return new Promise(function (resolve, reject) {
session.once(Session.Mapped, function() {
debug('user session mapped');
resolve(session);
});

session.begin(pu.Merge(policyOverrides, self.policy.session));
});
};

/**
* Creates a sender link for the given address, with optional link policy
*
Expand All @@ -204,7 +269,7 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
*
* @return {Promise<SenderLink>}
*/
AMQPClient.prototype.createSender = function(address, policyOverrides) {
AMQPClient.prototype.createSender = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand All @@ -229,19 +294,20 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
linkPolicy.defaultSubject = address.subject;
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -254,8 +320,8 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
*
* @return {Promise<SenderStream>}
*/
AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
return this.createSender(address, policyOverrides)
AMQPClient.prototype.createSenderStream = function(address, policyOverrides, session) {
return this.createSender(address, policyOverrides, session)
.then(function(link) { return new SenderStream(link, policyOverrides); });
};

Expand All @@ -271,7 +337,7 @@ AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
*
* @return {Promise<ReceiverLink>}
*/
AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
AMQPClient.prototype.createReceiver = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand Down Expand Up @@ -304,19 +370,20 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
translator(['described', ['symbol', filterSymbol], ['string', address.subject]]);
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -329,14 +396,14 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
*
* @return {Promise<ReceiverStream>}
*/
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides) {
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides, session) {
// Override default credit behavior, as the stream will handle flow. The
// creditQuantum will be used as the stream's highWatermark by default.
policyOverrides = u.deepMerge({
credit: function() {},
}, policyOverrides || {});

return this.createReceiver(address, policyOverrides)
return this.createReceiver(address, policyOverrides, session)
.then(function(link) { return new ReceiverStream(link); });
};

Expand Down Expand Up @@ -369,6 +436,46 @@ AMQPClient.prototype.disconnect = function() {
});
};

// Use the provided session, falling back to the client's session and creating
// it if it doesn't already exist
AMQPClient.prototype._getSession = function(session) {
// Just return the provided session if one was passed in
if (session) {
return Promise.resolve(session);
}

var self = this;
if (this._session) {
if (!this._session.mapping) {
// If we have a client session and it's not being mapped, return it
return Promise.resolve(this._session);
} else {
// If we have a client and it's currently mapping, wait for it to be mapped
// and then return it
return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
resolve(self._session);
});
});
}
}

if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

this._session = this._newSession(this._connection);

return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
debug('client session mapped');
resolve(self._session);
});

self._session.begin(self.policy.session);
});
};

AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
if (!!this._connection) this._connection.removeAllListeners();
this._connection = null;
Expand All @@ -377,6 +484,9 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
}

if (this._session) this._session._resetLinkState();
this._userSessions.forEach(function (session) {
session._resetLinkState();
});
};

// Helper methods for mocking in tests.
Expand All @@ -395,7 +505,12 @@ AMQPClient.prototype._newSession = function(conn) {
var self = this;
var session = new Session(conn);
session.on(Session.Unmapped, function(s) {
debug('unmapped');
debug('session unmapped');
if (session.disposed) {
session.removeAllListeners();
this._userSessions = this._userSessions.filter(function (s) { return s !== session; });
if (this._session === session) this._session = null;
}
});

session.on(Session.ErrorReceived, function(e) {
Expand Down
4 changes: 3 additions & 1 deletion lib/policies/policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ function Policy(overrides) {
* @property {function} window A function used to calculate how/when the flow control window should change
* @property {number} windowQuantum Quantum used in predefined window policies
* @property {boolean} enableSessionFlowControl Whether or not session flow control should be performed at all
* @property {object|null} reestablish=null Whether the session should attempt to reestablish when ended by the broker
*/
session: {
options: {
Expand All @@ -120,7 +121,8 @@ function Policy(overrides) {

window: putils.WindowPolicies.RefreshAtHalf,
windowQuantum: constants.session.defaultIncomingWindow,
enableSessionFlowControl: true
enableSessionFlowControl: true,
reestablish: null
},

/**
Expand Down
7 changes: 7 additions & 0 deletions lib/policies/service_bus_policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ var Policy = require('./policy');

module.exports = new Policy({
defaultSubjects: false,
session: {
reestablish: {
retries: 10,
strategy: 'fibonacci', // || 'exponential'
forever: true
}
},
senderLink: {
attach: {
maxMessageSize: 10000, // Arbitrary choice
Expand Down
Loading

0 comments on commit da6fe5a

Please sign in to comment.