Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(session): add support for multiple sessions per client #332

Merged
merged 1 commit into from
Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ script:
- make coverage

after_script:
- codeclimate-test-reporter < coverage/lcov.info
- codeclimate-test-reporter < coverage/lcov.info
187 changes: 151 additions & 36 deletions lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ function AMQPClient(policy, policyOverrides) {

this._connection = null;
this._session = null;
this._sessionPromise = null;
this._userSessions = [];

this._reconnect = null;
if (!!this.policy.reconnect) {
Expand Down Expand Up @@ -157,20 +159,54 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
self._connection.on(Connection.Connected, function(c) {
debug('connected');
self.emit(AMQPClient.ConnectionOpened);

var promises = [];

// Only map the session if it has already been created
if (self._session) {
debug('session already exists, re-using');
self._session.connection = self._connection;
} else {
self._session = self._newSession(c);

promises.push(new Promise(function(resolve, reject) {
self._session.once(Session.Mapped, function(s) {
debug('client session mapped');
resolve();
});

self._session.begin(pu.Merge(self.policy.session, self._session.policy));
}));
}

self._session.once(Session.Mapped, function(s) {
debug('mapped');
// Update any user-generated sessions
promises = promises.concat(self._userSessions.map(function(session) {
session.connection = self._connection;

return new Promise(function(resolve, reject) {
session.once(Session.Mapped, function(s) {
debug('user session mapped');
resolve();
});

// We specifically don't merge the policies here because the policy
// for these sessions is passed in explicitly on create of the
// session. It's very unlikely that there wouldn't be a policy for the
// session at this point, but we fall back to the session policy from
// the client just in case. That being said, the policy specified
// explicitly by the user for that session should take precendence,
// leading to the lack of a merge here.
session.begin(session.policy || self.policy.session);
});
}));

if (promises.length > 0) {
Promise.all(promises).then(function() {
self.emit('connected');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.emit('connected'); [](start = 10, length = 23)

unrelated but puzzling nonetheless to me: why would the "connected" state of the client be attached to sessions rather than connection or links?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this one is an artifact of how the Promise-based design works. it's trying to make sure things are in a usable state when the promise returns. since the session is baked into the connection by the library, that means both the connection and the session have to be ready. i considered making it lazy, but didn't know that i'd be able to get away with it without some subtle breaking changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i did make it lazy. the problem is reconnect because the session is already there

resolve(self);
}).catch(reject);
} else {
self.emit('connected');
resolve(self);
});

self._session.begin(self.policy.session);
}
});

self._connection.on(Connection.Disconnected, function() {
Expand All @@ -194,6 +230,35 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
});
};

/**
* Creates a session for the current connection that can be associated with any
* new links on that connection
*
* @inner @memberof AMQPClient
*
* @return {Promise<Session>}
*/
AMQPClient.prototype.createSession = function(policyOverrides) {
if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

policyOverrides = policyOverrides || {};

var session = this._newSession(this._connection);
this._userSessions.push(session);

var self = this;
return new Promise(function (resolve, reject) {
session.once(Session.Mapped, function() {
debug('user session mapped');
resolve(session);
});

session.begin(pu.Merge(policyOverrides, self.policy.session));
});
};

/**
* Creates a sender link for the given address, with optional link policy
*
Expand All @@ -204,7 +269,7 @@ AMQPClient.prototype.connect = function(url, policyOverrides) {
*
* @return {Promise<SenderLink>}
*/
AMQPClient.prototype.createSender = function(address, policyOverrides) {
AMQPClient.prototype.createSender = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand All @@ -229,19 +294,20 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
linkPolicy.defaultSubject = address.subject;
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -254,8 +320,8 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
*
* @return {Promise<SenderStream>}
*/
AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
return this.createSender(address, policyOverrides)
AMQPClient.prototype.createSenderStream = function(address, policyOverrides, session) {
return this.createSender(address, policyOverrides, session)
.then(function(link) { return new SenderStream(link, policyOverrides); });
};

Expand All @@ -271,7 +337,7 @@ AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
*
* @return {Promise<ReceiverLink>}
*/
AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
AMQPClient.prototype.createReceiver = function(address, policyOverrides, session) {
if (!this._connection) {
throw new Error('Must connect before creating links');
}
Expand Down Expand Up @@ -304,19 +370,20 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
translator(['described', ['symbol', filterSymbol], ['string', address.subject]]);
}

var self = this;
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};
return this._getSession(session).then(function (session) {
return new Promise(function(resolve, reject) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
return resolve(_link);
};

var link = self._session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};
var link = session.createLink(linkPolicy);
link._onAttach.push(attachPromise);
};

attach();
attach();
});
});
};

Expand All @@ -329,14 +396,14 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
*
* @return {Promise<ReceiverStream>}
*/
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides) {
AMQPClient.prototype.createReceiverStream = function(address, policyOverrides, session) {
// Override default credit behavior, as the stream will handle flow. The
// creditQuantum will be used as the stream's highWatermark by default.
policyOverrides = u.deepMerge({
credit: function() {},
}, policyOverrides || {});

return this.createReceiver(address, policyOverrides)
return this.createReceiver(address, policyOverrides, session)
.then(function(link) { return new ReceiverStream(link); });
};

Expand Down Expand Up @@ -369,6 +436,46 @@ AMQPClient.prototype.disconnect = function() {
});
};

// Use the provided session, falling back to the client's session and creating
// it if it doesn't already exist
AMQPClient.prototype._getSession = function(session) {
// Just return the provided session if one was passed in
if (session) {
return Promise.resolve(session);
}

var self = this;
if (this._session) {
if (!this._session.mapping) {
// If we have a client session and it's not being mapped, return it
return Promise.resolve(this._session);
} else {
// If we have a client and it's currently mapping, wait for it to be mapped
// and then return it
return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
resolve(self._session);
});
});
}
}

if (!this._connection) {
throw new Error('Must connect before creating sessions');
}

this._session = this._newSession(this._connection);

return new Promise(function (resolve, reject) {
self._session.once(Session.Mapped, function() {
debug('client session mapped');
resolve(self._session);
});

self._session.begin(self.policy.session);
});
};

AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
if (!!this._connection) this._connection.removeAllListeners();
this._connection = null;
Expand All @@ -377,6 +484,9 @@ AMQPClient.prototype._clearConnectionState = function(saveReconnectDetails) {
}

if (this._session) this._session._resetLinkState();
this._userSessions.forEach(function (session) {
session._resetLinkState();
});
};

// Helper methods for mocking in tests.
Expand All @@ -395,7 +505,12 @@ AMQPClient.prototype._newSession = function(conn) {
var self = this;
var session = new Session(conn);
session.on(Session.Unmapped, function(s) {
debug('unmapped');
debug('session unmapped');
if (session.disposed) {
session.removeAllListeners();
this._userSessions = this._userSessions.filter(function (s) { return s !== session; });
if (this._session === session) this._session = null;
}
});

session.on(Session.ErrorReceived, function(e) {
Expand Down
4 changes: 3 additions & 1 deletion lib/policies/policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ function Policy(overrides) {
* @property {function} window A function used to calculate how/when the flow control window should change
* @property {number} windowQuantum Quantum used in predefined window policies
* @property {boolean} enableSessionFlowControl Whether or not session flow control should be performed at all
* @property {object|null} reestablish=null Whether the session should attempt to reestablish when ended by the broker
*/
session: {
options: {
Expand All @@ -120,7 +121,8 @@ function Policy(overrides) {

window: putils.WindowPolicies.RefreshAtHalf,
windowQuantum: constants.session.defaultIncomingWindow,
enableSessionFlowControl: true
enableSessionFlowControl: true,
reestablish: null
},

/**
Expand Down
7 changes: 7 additions & 0 deletions lib/policies/service_bus_policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ var Policy = require('./policy');

module.exports = new Policy({
defaultSubjects: false,
session: {
reestablish: {
retries: 10,
strategy: 'fibonacci', // || 'exponential'
forever: true
}
},
senderLink: {
attach: {
maxMessageSize: 10000, // Arbitrary choice
Expand Down
Loading