Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions src/sessions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'timers/promises';

import { Binary, type Document, Long, type Timestamp } from './bson';
import type { CommandOptions, Connection } from './cmap/connection';
import { ConnectionPoolMetrics } from './cmap/metrics';
Expand Down Expand Up @@ -768,15 +770,15 @@ export class ClientSession

if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
(this.timeoutContext?.csotEnabled() || now() - startTime < MAX_TIMEOUT)
) {
continue;
}

throw fnError;
}

while (!committed) {
for (let retry = 0; !committed; ++retry) {
try {
/*
* We will rely on ClientSession.commitTransaction() to
Expand All @@ -786,26 +788,53 @@ export class ClientSession
await this.commitTransaction();
committed = true;
} catch (commitError) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
continue;
}
const hasNotTimedOut =
this.timeoutContext?.csotEnabled() || now() - startTime < MAX_TIMEOUT;

if (
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
break;
/**
* will the provided backoffMS exceed the withTransaction's deadline?
*/
const willExceedTransactionDeadline = (backoffMS: number) => {
return (
(this.timeoutContext?.csotEnabled() &&
backoffMS > this.timeoutContext.remainingTimeMS) ||
now() + backoffMS > startTime + MAX_TIMEOUT
);
};

// If CSOT is enabled, we repeatedly retry until timeoutMS expires.
// If CSOT is not enabled, do we still have time remaining or have we timed out?
if (hasNotTimedOut) {
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/

const BACKOFF_INITIAL_MS = 1;
const BACKOFF_MAX_MS = 500;
const jitter = Math.random();
const backoffMS =
jitter * Math.min(BACKOFF_INITIAL_MS * 1.25 ** retry, BACKOFF_MAX_MS);

if (willExceedTransactionDeadline(backoffMS)) {
break;
}

await setTimeout(backoffMS);

continue;
}

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
break;
}
}

throw commitError;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { expect } from 'chai';
import { test } from 'mocha';

import { type CommandFailedEvent, type MongoClient } from '../../mongodb';
import { configureFailPoint } from '../../tools/utils';
import { filterForCommands } from '../shared';

describe('Retry Backoff is Enforced', function () {
// Drivers should test that retries within `withTransaction` do not occur immediately. Optionally, set BACKOFF_INITIAL to a
// higher value to decrease flakiness of this test. Configure a fail point that forces 30 retries. Check that the total
// time for all retries exceeded 1.25 seconds.

let client: MongoClient;
let failures: Array<CommandFailedEvent>;

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });

failures = [];
client.on('commandFailed', filterForCommands('commitTransaction', failures));

await client.connect();

await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a version check around these tests?

From #4759 (comment)

failCommand requires 4.4+ servers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually yes. This PR is a work in progress (sorry if that wasn't clear, we usually mark PRs like this as WIP in the title and add the label). This won't merge for while yet (the design isn't finished, and then the spec needs to be implemented)

mode: {
times: 30
},
data: {
failCommands: ['commitTransaction'],
errorCode: 24,
errorLabels: ['UnknownTransactionCommitResult']
}
});
});

afterEach(async function () {
await client?.close();
});

for (let i = 0; i < 250; ++i) {
test.only('works' + i, async function () {
const start = performance.now();

await client.withSession(async s => {
await s.withTransaction(async s => {
await client.db('foo').collection('bar').insertOne({ name: 'bailey' }, { session: s });
});
});

const end = performance.now();

expect(failures).to.have.lengthOf(30);

expect(end - start).to.be.greaterThan(1250);
});
}
});
6 changes: 3 additions & 3 deletions test/tools/unified-spec-runner/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,10 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
if (entity.client.awaitMinPoolSizeMS) {
if (client.topology?.s?.servers) {
const timeout = Timeout.expires(entity.client.awaitMinPoolSizeMS);
const servers = client.topology.s.servers.values();
const poolSizeChecks = Array.from(servers).map(server =>
checkMinPoolSize(server.pool)
const servers = Array.from(client.topology.s.servers.values()).filter(
({ description: { isDataBearing } }) => isDataBearing
);
const poolSizeChecks = servers.map(server => checkMinPoolSize(server.pool));
try {
await Promise.race([Promise.allSettled(poolSizeChecks), timeout]);
} catch (error) {
Expand Down