Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PORT] fix: move session support check to operation layer #2750

Merged
merged 2 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,6 @@ export class MongoClient extends EventEmitter {
throw new MongoError('Must connect to a server before calling this method');
}

if (!this.topology.hasSessionSupport()) {
throw new MongoError('Current topology does not support sessions');
}

return this.topology.startSession(options, this.s.options);
}

Expand Down
43 changes: 21 additions & 22 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,32 @@ export function executeOperation<
throw new TypeError('This method requires a valid operation instance');
}

if (topology.shouldCheckForSessionSupport()) {
return maybePromise(callback, cb => {
topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
cb(err);
return;
}
return maybePromise(callback, cb => {
if (topology.shouldCheckForSessionSupport()) {
return topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) return cb(err);

executeOperation<T, TResult>(topology, operation, cb);
});
});
}
}

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session = operation.session;
let owner: symbol;
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (operation.session.hasEnded) {
throw new MongoError('Use of expired sessions is not permitted');
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session: ClientSession | undefined = operation.session;
let owner: symbol | undefined;
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoError('Use of expired sessions is not permitted'));
}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return cb(new MongoError('Current topology does not support sessions'));
}
}

return maybePromise(callback, cb => {
try {
executeWithServerSelection(topology, session, operation, (err, result) => {
if (session && session.owner && session.owner === owner) {
Expand All @@ -113,7 +112,7 @@ function supportsRetryableReads(server: Server) {
function executeWithServerSelection(
topology: Topology,
session: ClientSession,
operation: any,
operation: AbstractOperation,
callback: Callback
) {
const readPreference = operation.readPreference || ReadPreference.primary;
Expand Down
9 changes: 7 additions & 2 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const kSession = Symbol('session');
* a specific aspect.
* @internal
*/
export abstract class AbstractOperation<T> {
export abstract class AbstractOperation<TResult = any> {
ns!: MongoDBNamespace;
cmd!: Document;
readPreference: ReadPreference;
Expand All @@ -48,6 +48,9 @@ export abstract class AbstractOperation<T> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;
Comment on lines +51 to +52
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary type information for executeWithServerSelection


[kSession]: ClientSession;

constructor(options: OperationOptions = {}) {
Expand All @@ -61,9 +64,11 @@ export abstract class AbstractOperation<T> {
if (options.session) {
this[kSession] = options.session;
}

this.options = options;
}

abstract execute(server: Server, session: ClientSession, callback: Callback<T>): void;
abstract execute(server: Server, session: ClientSession, callback: Callback<TResult>): void;

hasAspect(aspect: symbol): boolean {
const ctor = this.constructor as OperationConstructor;
Expand Down
4 changes: 1 addition & 3 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,16 +571,14 @@ function endTransaction(session: ClientSession, commandName: string, callback: C
});
}

executeOperation(
return executeOperation(
session.topology,
new RunAdminCommandOperation(undefined, command, {
session,
readPreference: ReadPreference.primary
}),
(_err, _reply) => commandHandler(_err as MongoError, _reply)
);

return;
}

commandHandler(err as MongoError, reply);
Expand Down
2 changes: 1 addition & 1 deletion test/functional/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ describe('Indexes', function () {
metadata: {
requires: {
topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'],
mongodb: '>=3.0.0'
mongodb: '>=3.0.0 <=4.8.0'
}
},

Expand Down
33 changes: 17 additions & 16 deletions test/unit/sessions/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ describe('Sessions - client/unit', function () {
});
});

it('should throw an exception if sessions are not supported', {
it('should not throw a synchronous exception if sessions are not supported', {
metadata: { requires: { topology: 'single' } },
test: function (done) {
test() {
test.server.setMessageHandler(request => {
var doc = request.document;
if (doc.ismaster) {
Expand All @@ -27,13 +27,11 @@ describe('Sessions - client/unit', function () {
});

const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`);
client.connect(function (err, client) {
expect(err).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);

client.close(done);
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
}
});
Expand Down Expand Up @@ -93,15 +91,18 @@ describe('Sessions - client/unit', function () {
return replicaSetMock.uri();
})
.then(uri => {
const client = this.configuration.newClient(uri);
return client.connect();
testClient = this.configuration.newClient(uri);
return testClient.connect();
})
.then(client => {
testClient = client;
expect(client.topology.s.description.logicalSessionTimeoutMinutes).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);
const session = client.startSession();
return client.db().collection('t').insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.finally(() => (testClient ? testClient.close() : null));
}
Expand Down