Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3697): reduce serverSession allocation #3171

Merged
merged 14 commits into from
Mar 25, 2022
2 changes: 1 addition & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions {
// Applying a session to a command should happen as part of command construction,
// most likely in the CommandOperation#executeCommand method, where we have access to
// the details we need to determine if a txnNum should also be applied.
willRetryWrite?: true;
willRetryWrite?: boolean;

writeConcern?: WriteConcern;
}
Expand Down
5 changes: 2 additions & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface OperationConstructor extends Function {
export interface OperationOptions extends BSONSerializeOptions {
/** Specify ClientSession for this command */
session?: ClientSession;
willRetryWrites?: boolean;
willRetryWrite?: boolean;

/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
Expand Down Expand Up @@ -56,8 +56,7 @@ export abstract class AbstractOperation<TResult = any> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;
options: OperationOptions;

[kSession]: ClientSession | undefined;

Expand Down
120 changes: 74 additions & 46 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ import {

const minWireVersionForShardedTransactions = 8;

function assertAlive(session: ClientSession, callback?: Callback): boolean {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (session.serverSession == null) {
const error = new MongoExpiredSessionError();
if (typeof callback === 'function') {
callback(error);
return false;
}

throw error;
}

return true;
}

/** @public */
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
Expand Down Expand Up @@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime');
const kSnapshotEnabled = Symbol('snapshotEnabled');
/** @internal */
const kPinnedConnection = Symbol('pinnedConnection');
/** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
const kTxnNumberIncrement = Symbol('txnNumberIncrement');

/** @public */
export interface EndSessionOptions {
Expand Down Expand Up @@ -123,13 +111,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
defaultTransactionOptions: TransactionOptions;
transaction: Transaction;
/** @internal */
[kServerSession]?: ServerSession;
[kServerSession]: ServerSession | null;
/** @internal */
[kSnapshotTime]?: Timestamp;
/** @internal */
[kSnapshotEnabled] = false;
/** @internal */
[kPinnedConnection]?: Connection;
/** @internal */
[kTxnNumberIncrement]: number;

/**
* Create a client session.
Expand Down Expand Up @@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.explicit = !!options.explicit;
this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
this[kTxnNumberIncrement] = 0;

this.supports = {
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
Expand All @@ -181,24 +174,29 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.clusterTime = options.initialClusterTime;

this.operationTime = undefined;
this.explicit = !!options.explicit;
this.owner = options.owner;
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
this.transaction = new Transaction();
}

/** The server id associated with this session */
get id(): ServerSessionId | undefined {
return this.serverSession?.id;
return this[kServerSession]?.id;
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
let serverSession = this[kServerSession];
if (serverSession == null) {
if (this.explicit) {
throw new MongoRuntimeError('Unexpected null serverSession for an explicit session');
}
if (this.hasEnded) {
throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
}
serverSession = this.sessionPool.acquire();
this[kServerSession] = serverSession;
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
return serverSession;
}

/** Whether or not this session is configured for snapshot reads */
Expand Down Expand Up @@ -267,9 +265,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
const completeEndSession = () => {
maybeClearPinnedConnection(this, finalOptions);

// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;
const serverSession = this[kServerSession];
if (serverSession != null) {
// release the server session back to the pool
this.sessionPool.release(serverSession);
// Make sure a new serverSession never makes it on to the ClientSession
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Object.defineProperty(this, kServerSession, {
value: ServerSession.clone(serverSession)
});
}

// mark the session as ended, and emit a signal
this.hasEnded = true;
Expand All @@ -279,7 +283,9 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
done();
};

if (this.serverSession && this.inTransaction()) {
if (this.inTransaction()) {
// If we've reached endSession and the transaction is still active
// by default we abort it
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
Expand Down Expand Up @@ -353,12 +359,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
return this.id.id.buffer.equals(session.id.id.buffer);
}

/** Increment the transaction number on the internal ServerSession */
/**
* Increment the transaction number on the internal ServerSession
*
* @privateRemarks
* This helper increments a value stored on the client session that will be
* added to the serverSession's txnNumber upon applying it to a command.
* This is because the serverSession is lazily acquired after a connection is obtained
*/
incrementTransactionNumber(): void {
if (this.serverSession) {
this.serverSession.txnNumber =
typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
}
this[kTxnNumberIncrement] += 1;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}

/** @returns whether this session is currently in a transaction or not */
Expand All @@ -376,7 +386,6 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoTransactionError('Transaction already in progress');
}
Expand Down Expand Up @@ -627,7 +636,7 @@ function attemptTransaction<TSchema>(
throw err;
}

if (session.transaction.isActive) {
if (session.inTransaction()) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}

Expand All @@ -641,11 +650,6 @@ function endTransaction(
commandName: 'abortTransaction' | 'commitTransaction',
callback: Callback<Document>
) {
if (!assertAlive(session, callback)) {
// checking result in case callback was called
return;
}

// handle any initial problematic cases
const txnState = session.transaction.state;

Expand Down Expand Up @@ -750,7 +754,6 @@ function endTransaction(
callback(error, result);
}

// Assumption here that commandName is "commitTransaction" or "abortTransaction"
if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}
Expand Down Expand Up @@ -832,6 +835,30 @@ export class ServerSession {

return idleTimeMinutes > sessionTimeoutMinutes - 1;
}

/**
* @internal
* Cloning meant to keep a readable reference to the server session data
* after ClientSession has ended
*/
static clone(serverSession: ServerSession): Readonly<ServerSession> {
const arrayBuffer = new ArrayBuffer(16);
const idBytes = Buffer.from(arrayBuffer);
idBytes.set(serverSession.id.id.buffer);

const id = new Binary(idBytes, serverSession.id.id.sub_type);

// Manual prototype construction to avoid modifying the constructor of this class
return Object.setPrototypeOf(
{
id: { id },
lastUse: serverSession.lastUse,
txnNumber: serverSession.txnNumber,
isDirty: serverSession.isDirty
},
ServerSession.prototype
);
}
}

/**
Expand Down Expand Up @@ -944,11 +971,11 @@ export function applySession(
command: Document,
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
options: CommandOptions
): MongoDriverError | undefined {
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
if (session.hasEnded) {
return new MongoExpiredSessionError();
}

// May acquire serverSession here
const serverSession = session.serverSession;
if (serverSession == null) {
return new MongoRuntimeError('Unable to acquire server session');
Expand All @@ -966,15 +993,16 @@ export function applySession(
serverSession.lastUse = now();
command.lsid = serverSession.id;

// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = !!options.willRetryWrite;

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
if (isRetryableWrite || inTxnOrTxnCommand) {
serverSession.txnNumber += session[kTxnNumberIncrement];
session[kTxnNumberIncrement] = 0;
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}

if (!inTransaction) {
if (!inTxnOrTxnCommand) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}
Expand Down
51 changes: 51 additions & 0 deletions test/integration/sessions/sessions.spec.prose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { expect } from 'chai';

import { Collection } from '../../../src/index';

describe('ServerSession', () => {
let client;
let testCollection: Collection<{ _id: number; a?: number }>;
beforeEach(async function () {
const configuration = this.configuration;
client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect();

// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close(true);
});

/**
* TODO(NODE-4082): Refactor tests to align exactly with spec wording.
* Assert the following across at least 5 retries of the above test: (We do not need to retry in nodejs)
* Drivers MUST assert that exactly one session is used for all operations at least once across the retries of this test.
* Note that it's possible, although rare, for greater than 1 server session to be used because the session is not released until after the connection is checked in.
* Drivers MUST assert that the number of allocated sessions is strictly less than the number of concurrent operations in every retry of this test. In this instance it would less than (but NOT equal to) 8.
*/
it('13. may reuse one server session for many operations', async () => {
const events = [];
client.on('commandStarted', ev => events.push(ev));

const operations = [
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
testCollection.insertOne({ _id: 1 }),
testCollection.deleteOne({ _id: 2 }),
testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }),
testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]),
testCollection.findOneAndDelete({ _id: 5 }),
testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }),
testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }),
testCollection.find().toArray()
];

const allResults = await Promise.all(operations);

expect(allResults).to.have.lengthOf(operations.length);
expect(events).to.have.lengthOf(operations.length);

// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
33 changes: 33 additions & 0 deletions test/integration/sessions/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,37 @@ describe('Sessions Spec', function () {
});
});
});

describe('Session allocation', () => {
let client;
let testCollection;

beforeEach(async function () {
client = await this.configuration
.newClient({ maxPoolSize: 1, monitorCommands: true })
.connect();
// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close();
});

it('should only use one session for many operations when maxPoolSize is 1', async () => {
const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx }));

const events = [];
client.on('commandStarted', ev => events.push(ev));
const allResults = await Promise.all(
documents.map(async doc => testCollection.insertOne(doc))
);

expect(allResults).to.have.lengthOf(documents.length);
expect(events).to.have.lengthOf(documents.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
});
2 changes: 1 addition & 1 deletion test/tools/cluster_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
elif [[ $1 == "sharded_cluster" ]]; then
mkdir -p $SHARDED_DIR
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
echo "mongodb://bob:pwd123@localhost:51000,localhost:51001"
elif [[ $1 == "server" ]]; then
mkdir -p $SINGLE_DIR
Expand Down
2 changes: 2 additions & 0 deletions test/tools/spec-runner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) {
const rawExpectedEvents = spec.expectations.map(x => x.command_started_event);
const expectedEvents = normalizeCommandShapes(rawExpectedEvents);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const [idx, expectedEvent] of expectedEvents.entries()) {
const actualEvent = actualEvents[idx];

Expand Down
Loading