Skip to content

Commit

Permalink
feat(NODE-3697): reduce serverSession allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Mar 15, 2022
1 parent ff26b12 commit 8f7c72a
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions {
// Applying a session to a command should happen as part of command construction,
// most likely in the CommandOperation#executeCommand method, where we have access to
// the details we need to determine if a txnNum should also be applied.
willRetryWrite?: true;
willRetryWrite?: boolean;

writeConcern?: WriteConcern;
}
Expand Down
5 changes: 2 additions & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface OperationConstructor extends Function {
export interface OperationOptions extends BSONSerializeOptions {
/** Specify ClientSession for this command */
session?: ClientSession;
willRetryWrites?: boolean;
willRetryWrite?: boolean;

/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
Expand Down Expand Up @@ -56,8 +56,7 @@ export abstract class AbstractOperation<TResult = any> {
// BSON serialization options
bsonOptions?: BSONSerializeOptions;

// TODO: Each operation defines its own options, there should be better typing here
options: Document;
options: OperationOptions;

[kSession]: ClientSession | undefined;

Expand Down
106 changes: 63 additions & 43 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ import {

const minWireVersionForShardedTransactions = 8;

function assertAlive(session: ClientSession, callback?: Callback): boolean {
if (session.serverSession == null) {
const error = new MongoExpiredSessionError();
if (typeof callback === 'function') {
callback(error);
return false;
}

throw error;
}

return true;
}

/** @public */
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
Expand Down Expand Up @@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime');
const kSnapshotEnabled = Symbol('snapshotEnabled');
/** @internal */
const kPinnedConnection = Symbol('pinnedConnection');
/** @internal Accumulates total number of increments to perform to txnNumber */
const kTxnNumberIncrement = Symbol('txnNumberIncrement');

/** @public */
export interface EndSessionOptions {
Expand Down Expand Up @@ -130,6 +118,8 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
[kSnapshotEnabled] = false;
/** @internal */
[kPinnedConnection]?: Connection;
/** @internal Accumulates total number of increments to perform to txnNumber */
[kTxnNumberIncrement]: number;

/**
* Create a client session.
Expand Down Expand Up @@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.explicit = Boolean(options.explicit);
this[kServerSession] = this.explicit ? this.sessionPool.acquire() : undefined;
this[kTxnNumberIncrement] = 0;

this.supports = {
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
Expand All @@ -181,24 +174,27 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
this.clusterTime = options.initialClusterTime;

this.operationTime = undefined;
this.explicit = !!options.explicit;
this.owner = options.owner;
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
this.transaction = new Transaction();
}

/** The server id associated with this session */
get id(): ServerSessionId | undefined {
return this.serverSession?.id;
const serverSession = this[kServerSession];
if (serverSession == null) {
return undefined;
}
return serverSession.id;
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
let serverSession = this[kServerSession];
if (serverSession == null) {
serverSession = this.sessionPool.acquire();
this[kServerSession] = serverSession;
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
return serverSession;
}

/** Whether or not this session is configured for snapshot reads */
Expand Down Expand Up @@ -267,9 +263,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
const completeEndSession = () => {
maybeClearPinnedConnection(this, finalOptions);

// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;
const serverSession = this[kServerSession];
if (serverSession != null) {
// release the server session back to the pool
this.sessionPool.release(serverSession);
// Make sure a new serverSession never makes it on to the ClientSession
Object.defineProperty(this, kServerSession, {
value: ServerSession.clone(serverSession)
});
}

// mark the session as ended, and emit a signal
this.hasEnded = true;
Expand All @@ -279,7 +281,9 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
done();
};

if (this.serverSession && this.inTransaction()) {
if (this.inTransaction()) {
// If we've reached endSession and the transaction is still active
// by default we abort it
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
Expand Down Expand Up @@ -355,10 +359,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {

/** Increment the transaction number on the internal ServerSession */
incrementTransactionNumber(): void {
if (this.serverSession) {
this.serverSession.txnNumber =
typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
}
this[kTxnNumberIncrement] += 1;
}

/** @returns whether this session is currently in a transaction or not */
Expand All @@ -376,7 +377,6 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoTransactionError('Transaction already in progress');
}
Expand Down Expand Up @@ -627,7 +627,7 @@ function attemptTransaction<TSchema>(
throw err;
}

if (session.transaction.isActive) {
if (session.inTransaction()) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}

Expand All @@ -641,11 +641,6 @@ function endTransaction(
commandName: 'abortTransaction' | 'commitTransaction',
callback: Callback<Document>
) {
if (!assertAlive(session, callback)) {
// checking result in case callback was called
return;
}

// handle any initial problematic cases
const txnState = session.transaction.state;

Expand Down Expand Up @@ -750,7 +745,6 @@ function endTransaction(
callback(error, result);
}

// Assumption here that commandName is "commitTransaction" or "abortTransaction"
if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}
Expand Down Expand Up @@ -832,6 +826,30 @@ export class ServerSession {

return idleTimeMinutes > sessionTimeoutMinutes - 1;
}

/**
* @internal
* Cloning meant to keep a readable reference to the server session data
* after ClientSession has ended
*/
static clone(serverSession: ServerSession): Readonly<ServerSession> {
const arrayBuffer = new ArrayBuffer(16);
const idBytes = Buffer.from(arrayBuffer);
idBytes.set(serverSession.id.id.buffer);

const id = new Binary(idBytes, serverSession.id.id.sub_type);

// Manual prototype construction to avoid modifying the constructor of this class
return Object.setPrototypeOf(
{
id: { id },
lastUse: serverSession.lastUse,
txnNumber: serverSession.txnNumber,
isDirty: serverSession.isDirty
},
ServerSession.prototype
);
}
}

/**
Expand Down Expand Up @@ -944,11 +962,11 @@ export function applySession(
command: Document,
options: CommandOptions
): MongoDriverError | undefined {
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
if (session.hasEnded) {
return new MongoExpiredSessionError();
}

// May acquire serverSession here
const serverSession = session.serverSession;
if (serverSession == null) {
return new MongoRuntimeError('Unable to acquire server session');
Expand All @@ -967,14 +985,16 @@ export function applySession(
command.lsid = serverSession.id;

// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = Boolean(options.willRetryWrite);

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
if (isRetryableWrite || inTxnOrTxnCommand) {
serverSession.txnNumber += session[kTxnNumberIncrement];
session[kTxnNumberIncrement] = 0;
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
}

if (!inTransaction) {
if (!inTxnOrTxnCommand) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}
Expand Down
50 changes: 50 additions & 0 deletions test/integration/sessions/sessions.spec.prose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { expect } from 'chai';

import { Collection } from '../../../src/index';

describe('ServerSession', () => {
let client;
let testCollection: Collection<{ _id: number; a?: number }>;
beforeEach(async function () {
const configuration = this.configuration;
client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect();

// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close(true);
});

/**
* TODO(DRIVERS-2218): Refactor tests to align exactly with spec wording. Preliminarily implements:
* Drivers MAY assert that exactly one session is used for all the concurrent operations listed in the test, however this is a race condition if the session isn't released before checkIn (which SHOULD NOT be attempted)
* Drivers SHOULD assert that after repeated runs they are able to achieve the use of exactly one session, this will statistically prove we've reduced the allocation amount
* Drivers MUST assert that the number of allocated sessions never exceeds the number of concurrent operations executing
*/

it('13. may reuse one server session for many operations', async () => {
const events = [];
client.on('commandStarted', ev => events.push(ev));

const operations = [
testCollection.insertOne({ _id: 1 }),
testCollection.deleteOne({ _id: 2 }),
testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }),
testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]),
testCollection.findOneAndDelete({ _id: 5 }),
testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }),
testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }),
testCollection.find().toArray()
];

const allResults = await Promise.all(operations);

expect(allResults).to.have.lengthOf(operations.length);
expect(events).to.have.lengthOf(operations.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); // This is a guarantee in node
});
});
33 changes: 33 additions & 0 deletions test/integration/sessions/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,37 @@ describe('Sessions Spec', function () {
});
});
});

describe('Session allocation', () => {
let client;
let testCollection;

beforeEach(async function () {
client = await this.configuration
.newClient({ maxPoolSize: 1, monitorCommands: true })
.connect();
// reset test collection
testCollection = client.db('test').collection('too.many.sessions');
await testCollection.drop().catch(() => null);
});

afterEach(async () => {
await client?.close();
});

it('should only use one session for many operations when maxPoolSize is 1', async () => {
const documents = new Array(50).fill(null).map((_, idx) => ({ _id: idx }));

const events = [];
client.on('commandStarted', ev => events.push(ev));
const allResults = await Promise.all(
documents.map(async doc => testCollection.insertOne(doc))
);

expect(allResults).to.have.lengthOf(documents.length);
expect(events).to.have.lengthOf(documents.length);

expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
});
});
});
2 changes: 1 addition & 1 deletion test/tools/cluster_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
elif [[ $1 == "sharded_cluster" ]]; then
mkdir -p $SHARDED_DIR
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
echo "mongodb://bob:pwd123@localhost:51000,localhost:51001"
elif [[ $1 == "server" ]]; then
mkdir -p $SINGLE_DIR
Expand Down
2 changes: 2 additions & 0 deletions test/tools/spec-runner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) {
const rawExpectedEvents = spec.expectations.map(x => x.command_started_event);
const expectedEvents = normalizeCommandShapes(rawExpectedEvents);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const [idx, expectedEvent] of expectedEvents.entries()) {
const actualEvent = actualEvents[idx];

Expand Down

0 comments on commit 8f7c72a

Please sign in to comment.