diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 6bd65cce56..d5daf8b186 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -90,6 +90,8 @@ export interface ConnectionPoolOptions extends Omit { maxConnecting: options.maxConnecting ?? 2, maxIdleTimeMS: options.maxIdleTimeMS ?? 0, waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0, + minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100, autoEncrypter: options.autoEncrypter, metadata: options.metadata }); @@ -683,12 +686,18 @@ export class ConnectionPool extends TypedEventEmitter { } if (this[kPoolState] === PoolState.ready) { clearTimeout(this[kMinPoolSizeTimer]); - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + this[kMinPoolSizeTimer] = setTimeout( + () => this.ensureMinPoolSize(), + this.options.minPoolSizeCheckFrequencyMS + ); } }); } else { clearTimeout(this[kMinPoolSizeTimer]); - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); + this[kMinPoolSizeTimer] = setTimeout( + () => this.ensureMinPoolSize(), + this.options.minPoolSizeCheckFrequencyMS + ); } } diff --git a/test/integration/server-selection/server_selection.prose.operation_count.test.ts b/test/integration/server-selection/server_selection.prose.operation_count.test.ts index 21b41ad423..fef521dd24 100644 --- a/test/integration/server-selection/server_selection.prose.operation_count.test.ts +++ b/test/integration/server-selection/server_selection.prose.operation_count.test.ts @@ -1,10 +1,10 @@ import { expect } from 'chai'; -import { setTimeout } from 'timers'; -import { promisify } from 'util'; +import { on } from 'events'; import { CommandStartedEvent } from '../../../src'; import { Collection } from '../../../src/collection'; import { MongoClient } from '../../../src/mongo_client'; +import { sleep } from '../../tools/utils'; const failPoint = { configureFailPoint: 'failCommand', @@ -25,21 +25,14 @@ async function runTaskGroup(collection: Collection, count: 10 | 100 | 1000) { } } -async function ensurePoolIsFull(client: MongoClient) { +async function ensurePoolIsFull(client: MongoClient): Promise { let connectionCount = 0; - const onConnectionCreated = () => connectionCount++; - client.on('connectionCreated', onConnectionCreated); - - // 250ms should be plenty of time to fill the connection pool, - // but just in case we'll loop a couple of times. - for (let i = 0; connectionCount < POOL_SIZE * 2 && i < 10; ++i) { - await promisify(setTimeout)(250); - } - - client.removeListener('connectionCreated', onConnectionCreated); - - if (connectionCount !== POOL_SIZE * 2) { - throw new Error('Connection pool did not fill up'); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _event of on(client, 'connectionCreated')) { + connectionCount++; + if (connectionCount === POOL_SIZE * 2) { + break; + } } } @@ -82,7 +75,10 @@ describe('operationCount-based Selection Within Latency Window - Prose Test', fu await client.connect(); // Step 4: Using CMAP events, ensure the client's connection pools for both mongoses have been saturated - await poolIsFullPromise; + const poolIsFull = Promise.race([poolIsFullPromise, sleep(30 * 1000)]); + if (!poolIsFull) { + throw new Error('Timed out waiting for connection pool to fill to minPoolSize'); + } seeds = client.topology.s.seedlist.map(address => address.toString()); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index 340d4a1c4c..9919ab8223 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -351,8 +351,11 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { const poolOptions = test.poolOptions || {}; expect(CMAP_POOL_OPTION_NAMES).to.include.members(Object.keys(poolOptions)); - // TODO(NODE-3255): update condition to only remove option if set to -1 + let minPoolSizeCheckFrequencyMS; if (poolOptions.backgroundThreadIntervalMS) { + if (poolOptions.backgroundThreadIntervalMS !== -1) { + minPoolSizeCheckFrequencyMS = poolOptions.backgroundThreadIntervalMS; + } delete poolOptions.backgroundThreadIntervalMS; } @@ -373,7 +376,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { const mainThread = threadContext.getThread(MAIN_THREAD_KEY); mainThread.start(); - threadContext.createPool({ ...poolOptions, metadata }); + threadContext.createPool({ ...poolOptions, metadata, minPoolSizeCheckFrequencyMS }); // yield control back to the event loop so that the ConnectionPoolCreatedEvent // has a chance to be fired before any synchronously-emitted events from // the queued operations diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 5b4ac7c5e7..29ae69a661 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -9,6 +9,7 @@ const { expect } = require('chai'); const { setImmediate } = require('timers'); const { ns, isHello } = require('../../../src/utils'); const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); +const { createTimerSandbox } = require('../timer_sandbox'); describe('Connection Pool', function () { let server; @@ -128,6 +129,93 @@ describe('Connection Pool', function () { }); }); + describe('minPoolSize population', function () { + let clock, timerSandbox; + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + if (clock) { + timerSandbox.restore(); + clock.restore(); + clock = undefined; + } + }); + + it('should respect the minPoolSizeCheckFrequencyMS option', function () { + const pool = new ConnectionPool(server, { + minPoolSize: 2, + minPoolSizeCheckFrequencyMS: 42, + hostAddress: server.hostAddress() + }); + const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize'); + + // return a fake connection that won't get identified as perished + const createConnStub = sinon + .stub(pool, 'createConnection') + .yields(null, { destroy: () => null, generation: 0 }); + + pool.ready(); + + // expect ensureMinPoolSize to execute immediately + expect(ensureSpy).to.have.been.calledOnce; + expect(createConnStub).to.have.been.calledOnce; + + // check that the successful connection return schedules another run + clock.tick(42); + expect(ensureSpy).to.have.been.calledTwice; + expect(createConnStub).to.have.been.calledTwice; + + // check that the 2nd successful connection return schedules another run + // but don't expect to get a new connection since we are at minPoolSize + clock.tick(42); + expect(ensureSpy).to.have.been.calledThrice; + expect(createConnStub).to.have.been.calledTwice; + + // check that the next scheduled check runs even after we're at minPoolSize + clock.tick(42); + expect(ensureSpy).to.have.callCount(4); + expect(createConnStub).to.have.been.calledTwice; + }); + + it('should default minPoolSizeCheckFrequencyMS to 100ms', function () { + const pool = new ConnectionPool(server, { + minPoolSize: 2, + hostAddress: server.hostAddress() + }); + const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize'); + + // return a fake connection that won't get identified as perished + const createConnStub = sinon + .stub(pool, 'createConnection') + .yields(null, { destroy: () => null, generation: 0 }); + + pool.ready(); + + // expect ensureMinPoolSize to execute immediately + expect(ensureSpy).to.have.been.calledOnce; + expect(createConnStub).to.have.been.calledOnce; + + // check that the successful connection return schedules another run + clock.tick(100); + expect(ensureSpy).to.have.been.calledTwice; + expect(createConnStub).to.have.been.calledTwice; + + // check that the 2nd successful connection return schedules another run + // but don't expect to get a new connection since we are at minPoolSize + clock.tick(100); + expect(ensureSpy).to.have.been.calledThrice; + expect(createConnStub).to.have.been.calledTwice; + + // check that the next scheduled check runs even after we're at minPoolSize + clock.tick(100); + expect(ensureSpy).to.have.callCount(4); + expect(createConnStub).to.have.been.calledTwice; + }); + }); + describe('withConnection', function () { it('should manage a connection for a successful operation', function (done) { server.setMessageHandler(request => {