Skip to content

Commit

Permalink
fix: move session support check to operation layer (#2750)
Browse files Browse the repository at this point in the history
Session support check is now performed after server selection
this ensures the monitor was able to update the topology description

NODE-3100
  • Loading branch information
nbbeeken authored and ljhaywar committed Nov 9, 2021
1 parent a67c847 commit e296c99
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 47 deletions.
4 changes: 0 additions & 4 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,6 @@ export class MongoClient extends EventEmitter {
throw new MongoError('Must connect to a server before calling this method');
}

if (!this.topology.hasSessionSupport()) {
throw new MongoError('Current topology does not support sessions');
}

return this.topology.startSession(options, this.s.options);
}

Expand Down
43 changes: 21 additions & 22 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,32 @@ export function executeOperation<
throw new TypeError('This method requires a valid operation instance');
}

if (topology.shouldCheckForSessionSupport()) {
return maybePromise(callback, cb => {
topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
cb(err);
return;
}
return maybePromise(callback, cb => {
if (topology.shouldCheckForSessionSupport()) {
return topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) return cb(err);

executeOperation<T, TResult>(topology, operation, cb);
});
});
}
}

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session = operation.session;
let owner: symbol;
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (operation.session.hasEnded) {
throw new MongoError('Use of expired sessions is not permitted');
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session: ClientSession | undefined = operation.session;
let owner: symbol | undefined;
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoError('Use of expired sessions is not permitted'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return cb(new MongoError('Current topology does not support sessions'));
}
}

return maybePromise(callback, cb => {
try {
executeWithServerSelection(topology, session, operation, (err, result) => {
if (session && session.owner && session.owner === owner) {
Expand All @@ -113,7 +112,7 @@ function supportsRetryableReads(server: Server) {
function executeWithServerSelection(
topology: Topology,
session: ClientSession,
operation: any,
operation: AbstractOperation,
callback: Callback
) {
const readPreference = operation.readPreference || ReadPreference.primary;
Expand Down
9 changes: 7 additions & 2 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const kSession = Symbol('session');
* a specific aspect.
* @internal
*/
export abstract class AbstractOperation<T> {
export abstract class AbstractOperation<TResult = any> {
ns!: MongoDBNamespace;
cmd!: Document;
readPreference: ReadPreference;
Expand All @@ -48,6 +48,9 @@ export abstract class AbstractOperation<T> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;

[kSession]: ClientSession;

constructor(options: OperationOptions = {}) {
Expand All @@ -61,9 +64,11 @@ export abstract class AbstractOperation<T> {
if (options.session) {
this[kSession] = options.session;
}

this.options = options;
}

abstract execute(server: Server, session: ClientSession, callback: Callback<T>): void;
abstract execute(server: Server, session: ClientSession, callback: Callback<TResult>): void;

hasAspect(aspect: symbol): boolean {
const ctor = this.constructor as OperationConstructor;
Expand Down
4 changes: 1 addition & 3 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,16 +571,14 @@ function endTransaction(session: ClientSession, commandName: string, callback: C
});
}

executeOperation(
return executeOperation(
session.topology,
new RunAdminCommandOperation(undefined, command, {
session,
readPreference: ReadPreference.primary
}),
(_err, _reply) => commandHandler(_err as MongoError, _reply)
);

return;
}

commandHandler(err as MongoError, reply);
Expand Down
33 changes: 17 additions & 16 deletions test/unit/sessions/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ describe('Sessions - client/unit', function () {
});
});

it('should throw an exception if sessions are not supported', {
it('should not throw a synchronous exception if sessions are not supported', {
metadata: { requires: { topology: 'single' } },
test: function (done) {
test() {
test.server.setMessageHandler(request => {
var doc = request.document;
if (doc.ismaster) {
Expand All @@ -27,13 +27,11 @@ describe('Sessions - client/unit', function () {
});

const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`);
client.connect(function (err, client) {
expect(err).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);

client.close(done);
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
}
});
Expand Down Expand Up @@ -93,15 +91,18 @@ describe('Sessions - client/unit', function () {
return replicaSetMock.uri();
})
.then(uri => {
const client = this.configuration.newClient(uri);
return client.connect();
testClient = this.configuration.newClient(uri);
return testClient.connect();
})
.then(client => {
testClient = client;
expect(client.topology.s.description.logicalSessionTimeoutMinutes).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);
const session = client.startSession();
return client.db().collection('t').insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.finally(() => (testClient ? testClient.close() : null));
}
Expand Down

0 comments on commit e296c99

Please sign in to comment.