Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3144): pool clear event ordering and retryability tests #3407

Merged
merged 15 commits into from
Sep 15, 2022
Merged
3 changes: 1 addition & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
this.processWaitQueue();

if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
}
this.processWaitQueue();
}

/** Close the pool */
Expand Down
1 change: 0 additions & 1 deletion src/cmap/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export class PoolClosedError extends MongoDriverError {
* @category Error
*/
export class PoolClearedError extends MongoNetworkError {
// TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec
/** The address of the connection pool */
address: string;

Expand Down
136 changes: 136 additions & 0 deletions test/integration/retryable-reads/retryable_reads.spec.prose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { expect } from 'chai';

import { Collection, MongoClient } from '../../../src';

describe('Retryable Reads Spec Prose', () => {
let client: MongoClient, failPointName;

afterEach(async () => {
try {
if (failPointName) {
await client.db('admin').command({ configureFailPoint: failPointName, mode: 'off' });
}
} finally {
failPointName = undefined;
await client?.close();
}
});

describe('PoolClearedError Retryability Test', () => {
// This test will be used to ensure drivers properly retry after encountering PoolClearedErrors.
// It MUST be implemented by any driver that implements the CMAP specification.
// This test requires MongoDB 4.2.9+ for blockConnection support in the failpoint.

let cmapEvents: Array<{ name: string; event: Record<string, any> }>;
let commandStartedEvents: Array<Record<string, any>>;
let testCollection: Collection;
beforeEach(async function () {
// 1. Create a client with maxPoolSize=1 and retryReads=true.
// If testing against a sharded deployment, be sure to connect to only a single mongos. <-- TODO: what does that look like?
durran marked this conversation as resolved.
Show resolved Hide resolved
durran marked this conversation as resolved.
Show resolved Hide resolved
client = this.configuration.newClient({
maxPoolSize: 1,
retryReads: true,
monitorCommands: true
});

testCollection = client.db('retryable-reads-prose').collection('pool-clear-retry');
await testCollection.drop().catch(() => null);
await testCollection.insertMany([{ test: 1 }, { test: 2 }]);

// 2. Enable the following failpoint:
// NOTE: "9. Disable the failpoint" is done in afterEach
failPointName = 'failCommand';
const failPoint = await client.db('admin').command({
configureFailPoint: failPointName,
mode: { times: 1 },
data: {
failCommands: ['find'],
errorCode: 91,
blockConnection: true,
blockTimeMS: 1000
}
});

expect(failPoint).to.have.property('ok', 1);

cmapEvents = [];
durran marked this conversation as resolved.
Show resolved Hide resolved
for (const observedEvent of [
'connectionCheckOutStarted',
'connectionCheckedOut',
'connectionCheckOutFailed',
'connectionPoolCleared'
]) {
client.on(observedEvent, ev => {
cmapEvents.push({ name: observedEvent, event: ev });
});
}

client.on('commandStarted', ev => {
commandStartedEvents.push(ev);
});
});

it('should emit events in the expected sequence', {
metadata: { requires: { mongodb: '>=4.2.9', topology: '!load-balanced' } },
test: async function () {
// 3. Start two threads and attempt to perform a findOne simultaneously on both.
const results = await Promise.all([
testCollection.findOne({ test: 1 }),
testCollection.findOne({ test: 2 })
]);

client.removeAllListeners();
// 4. Verify that both findOne attempts succeed.
expect(results[0]).to.have.property('test', 1);
expect(results[1]).to.have.property('test', 2);

// NOTE: For the subsequent checks, we rely on the exact sequence of ALL events
// for ease of readability; however, only the relative order matters for
// the purposes of this test, so if this ever becomes an issue, the test
// can be refactored to assert on relative index values instead

// 5. Via CMAP monitoring, assert that the first check out succeeds.
durran marked this conversation as resolved.
Show resolved Hide resolved
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 1) checkout 1 to start'
);
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 2) checkout 2 to start'
);
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 3) first checkout to succeed'
);

// 6. Via CMAP monitoring, assert that a PoolClearedEvent is then emitted.
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionPoolCleared',
'expected 4) pool to clear'
);

// 7. Via CMAP monitoring, assert that the second check out then fails due to a connection error.
const nextEvent = cmapEvents.shift();
expect(nextEvent).to.have.property(
'name',
'connectionCheckOutFailed',
'expected 5) checkout 2 to fail'
);
expect(nextEvent).to.have.deep.property('event.reason', 'connectionError');

// 8. Via Command Monitoring, assert that exactly three find CommandStartedEvents were observed in total.
const observedFindCommandStartedEvents = commandStartedEvents.filter(
({ commandName }) => commandName === 'find'
);
expect(observedFindCommandStartedEvents).to.have.lengthOf(
3,
'expected 3 find command started events'
);
}
});
});
});
225 changes: 175 additions & 50 deletions test/integration/retryable-writes/retryable_writes.spec.prose.test.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,189 @@
import { expect } from 'chai';

import { MongoError, MongoServerError, TopologyType } from '../../../src';
import { Collection, MongoClient, MongoError, MongoServerError } from '../../../src';

describe('Retryable Writes Spec Prose', () => {
/**
* 1 Test that retryable writes raise an exception when using the MMAPv1 storage engine.
* For this test, execute a write operation, such as insertOne, which should generate an exception and the error code is 20.
* Assert that the error message is the replacement error message:
*
* ```
* This MongoDB deployment does not support retryable writes. Please add
* retryWrites=false to your connection string.
* ```
* Note: Drivers that rely on serverStatus to determine the storage engine in use MAY skip this test for sharded clusters, since mongos does not report this information in its serverStatus response.
*/
let client;

beforeEach(async function () {
if (
this.configuration.buildInfo.versionArray[0] < 4 ||
this.configuration.topologyType !== TopologyType.ReplicaSetWithPrimary
) {
this.currentTest.skipReason =
'configureFailPoint only works on server versions greater than 4';
this.skip();
let client: MongoClient, failPointName;

afterEach(async () => {
try {
if (failPointName) {
await client.db('admin').command({ configureFailPoint: failPointName, mode: 'off' });
}
} finally {
failPointName = undefined;
await client?.close();
}
client = this.configuration.newClient();
await client.connect();
});

afterEach(async () => {
await client?.close();
describe('1. Test that retryable writes raise an exception when using the MMAPv1 storage engine.', () => {
/**
* For this test, execute a write operation, such as insertOne, which should generate an exception and the error code is 20.
* Assert that the error message is the replacement error message:
*
* ```
* This MongoDB deployment does not support retryable writes. Please add
* retryWrites=false to your connection string.
* ```
* Note: Drivers that rely on serverStatus to determine the storage engine in use MAY skip this test for sharded clusters, since mongos does not report this information in its serverStatus response.
*/
beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();

failPointName = 'failCommand';
const failPoint = await client.db('admin').command({
configureFailPoint: failPointName,
mode: { times: 1 },
data: {
failCommands: ['insert'],
errorCode: 20, // MMAP Error code,
closeConnection: false
}
});

expect(failPoint).to.have.property('ok', 1);
});

for (const testTopology of ['replicaset', 'sharded']) {
const minFailPointVersion = testTopology === 'replicaset' ? '>=4.0.0' : '>=4.1.5';
it(`should error with the correct error message when topology is ${testTopology}`, {
metadata: { requires: { mongodb: minFailPointVersion, topology: [testTopology as any] } },
test: async function () {
const error = await client
.db('test')
.collection('test')
.insertOne({ a: 1 })
.catch(error => error);

expect(error).to.exist;
expect(error).that.is.instanceOf(MongoServerError);
expect(error).to.have.property('originalError').that.instanceOf(MongoError);
expect(error.originalError).to.have.property('code', 20);
expect(error).to.have.property(
'message',
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.'
);
}
});
}
});

it('retryable writes raise an exception when using the MMAPv1 storage engine', async () => {
const failPoint = await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['insert'],
errorCode: 20, // MMAP Error code,
closeConnection: false
describe('2. Test that drivers properly retry after encountering PoolClearedErrors.', () => {
// This test MUST be implemented by any driver that implements the CMAP specification.
// This test requires MongoDB 4.2.9+ for blockConnection support in the failpoint.

let cmapEvents: Array<{ name: string; event: Record<string, any> }>;
let commandStartedEvents: Array<Record<string, any>>;
let testCollection: Collection;
beforeEach(async function () {
// i. Create a client with maxPoolSize=1 and retryWrites=true.
// If testing against a sharded deployment, be sure to connect to only a single mongos. <-- TODO: what does that look like?
client = this.configuration.newClient({
maxPoolSize: 1,
retryWrites: true,
monitorCommands: true
});

testCollection = client.db('retryable-writes-prose').collection('pool-clear-retry');
await testCollection.drop().catch(() => null);

// ii. Enable the following failpoint:
// NOTE: "ix. Disable the failpoint" is done in afterEach
failPointName = 'failCommand';
const failPoint = await client.db('admin').command({
configureFailPoint: failPointName,
mode: { times: 1 },
data: {
failCommands: ['insert'],
errorCode: 91,
blockConnection: true,
blockTimeMS: 1000,
errorLabels: ['RetryableWriteError']
}
});

expect(failPoint).to.have.property('ok', 1);

cmapEvents = [];
durran marked this conversation as resolved.
Show resolved Hide resolved
for (const observedEvent of [
'connectionCheckOutStarted',
'connectionCheckedOut',
'connectionCheckOutFailed',
'connectionPoolCleared'
]) {
client.on(observedEvent, ev => {
cmapEvents.push({ name: observedEvent, event: ev });
});
}
client.on('commandStarted', ev => {
commandStartedEvents.push(ev);
});
});

expect(failPoint).to.have.property('ok', 1);

const error = await client
.db('test')
.collection('test')
.insertOne({ a: 1 })
.catch(error => error);

expect(error).to.exist;
expect(error).that.is.instanceOf(MongoServerError);
expect(error).to.have.property('originalError').that.instanceOf(MongoError);
expect(error.originalError).to.have.property('code', 20);
expect(error).to.have.property(
'message',
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.'
);
it('should emit events in the expected sequence', {
metadata: { requires: { mongodb: '>=4.2.9', topology: ['replicaset', 'sharded'] } },
test: async function () {
// iii. Start two threads and attempt to perform an insertOne simultaneously on both.
await Promise.all([
testCollection.insertOne({ test: 1 }),
testCollection.insertOne({ test: 2 })
]);

client.removeAllListeners();
// iv. Verify that both insertOne attempts succeed.
const result = await testCollection.find().toArray();
expect(result).to.have.lengthOf(2);
const mappedAndSortedResult = result.map(item => item.test).sort();
expect(mappedAndSortedResult).to.deep.equal([1, 2]);

// NOTE: For the subsequent checks, we rely on the exact sequence of ALL events
// for ease of readability; however, only the relative order matters for
// the purposes of this test, so if this ever becomes an issue, the test
// can be refactored to assert on relative index values instead

// v. Via CMAP monitoring, assert that the first check out succeeds.
durran marked this conversation as resolved.
Show resolved Hide resolved
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 1) checkout 1 to start'
);
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 2) checkout 2 to start'
);
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionCheckOutStarted',
'expected 3) first checkout to succeed'
);

// vi. Via CMAP monitoring, assert that a PoolClearedEvent is then emitted.
expect(cmapEvents.shift()).to.have.property(
'name',
'connectionPoolCleared',
'expected 4) pool to clear'
);

// vii. Via CMAP monitoring, assert that the second check out then fails due to a connection error.
const nextEvent = cmapEvents.shift();
expect(nextEvent).to.have.property(
'name',
'connectionCheckOutFailed',
'expected 5) checkout 2 to fail'
);
expect(nextEvent).to.have.deep.property('event.reason', 'connectionError');

// viii. Via Command Monitoring, assert that exactly three insert CommandStartedEvents were observed in total.
const observedInsertCommandStartedEvents = commandStartedEvents.filter(
({ commandName }) => commandName === 'insert'
);
expect(observedInsertCommandStartedEvents).to.have.lengthOf(
3,
'expected 3 insert command started events'
);
}
});
});
});