diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 474edc44f70..1a7ee47b451 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -2,7 +2,7 @@ import { promises as fs } from 'fs'; import type { TcpNetConnectOpts } from 'net'; import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls'; -import { type ServerCommandOptions, type TimeoutContext } from '.'; +import { TopologyType } from '.'; import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson'; import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream'; import type { AutoEncrypter, AutoEncryptionOptions } from './client-side-encryption/auto_encrypter'; @@ -22,7 +22,6 @@ import { makeClientMetadata } from './cmap/handshake/client_metadata'; import type { CompressorName } from './cmap/wire_protocol/compression'; -import { MongoDBResponse } from './cmap/wire_protocol/responses'; import { parseOptions, resolveSRVRecord } from './connection_string'; import { MONGO_CLIENT_EVENTS } from './constants'; import { type AbstractCursor } from './cursor/abstract_cursor'; @@ -44,8 +43,8 @@ import { type ClientBulkWriteResult } from './operations/client_bulk_write/common'; import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor'; +import { EndSessionsOperation } from './operations/end_sessions'; import { executeOperation } from './operations/execute_operation'; -import { AbstractOperation } from './operations/operation'; import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern'; import { ReadPreference, type ReadPreferenceMode } from './read_preference'; import { type AsyncDisposable, configureResourceManagement } from './resource_management'; @@ -63,7 +62,7 @@ import { type HostAddress, hostMatchesWildcards, isHostMatch, - MongoDBNamespace, + type MongoDBNamespace, noop, ns, resolveOptions, @@ -795,40 +794,12 @@ export class MongoClient extends TypedEventEmitter implements return; } - // If we would attempt to select a server and get nothing back we short circuit - // to avoid the server selection timeout. - const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred); - const topologyDescription = this.topology.description; - const serverDescriptions = Array.from(topologyDescription.servers.values()); - const servers = selector(topologyDescription, serverDescriptions); - if (servers.length !== 0) { - const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id); - if (endSessions.length !== 0) { - try { - class EndSessionsOperation extends AbstractOperation { - override ns = MongoDBNamespace.fromString('admin.$cmd'); - override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; - override buildCommand(_connection: Connection, _session?: ClientSession): Document { - return { - endSessions - }; - } - override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions { - return { - timeoutContext, - readPreference: ReadPreference.primaryPreferred, - noResponse: true - }; - } - override get commandName(): string { - return 'endSessions'; - } - } - await executeOperation(this, new EndSessionsOperation()); - } catch (error) { - squashError(error); - } - } + const supportsSessions = + this.topology.description.type === TopologyType.LoadBalanced || + this.topology.description.logicalSessionTimeoutMinutes != null; + + if (supportsSessions) { + await endSessions(this, this.topology); } // clear out references to old topology @@ -841,6 +812,27 @@ export class MongoClient extends TypedEventEmitter implements if (encrypter) { await encrypter.close(this); } + + async function endSessions( + client: MongoClient, + { description: topologyDescription }: Topology + ) { + // If we would attempt to select a server and get nothing back we short circuit + // to avoid the server selection timeout. + const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred); + const serverDescriptions = Array.from(topologyDescription.servers.values()); + const servers = selector(topologyDescription, serverDescriptions); + if (servers.length !== 0) { + const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id); + if (endSessions.length !== 0) { + try { + await executeOperation(client, new EndSessionsOperation(endSessions)); + } catch (error) { + squashError(error); + } + } + } + } } /** diff --git a/src/operations/end_sessions.ts b/src/operations/end_sessions.ts new file mode 100644 index 00000000000..7340d8b03ff --- /dev/null +++ b/src/operations/end_sessions.ts @@ -0,0 +1,42 @@ +import { + type ClientSession, + type Connection, + type ServerCommandOptions, + type ServerSessionId, + type TimeoutContext +} from '..'; +import { type Document } from '../bson'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; +import { ReadPreference } from '../read_preference'; +import { MongoDBNamespace } from '../utils'; +import { AbstractOperation } from './operation'; + +export class EndSessionsOperation extends AbstractOperation { + override ns = MongoDBNamespace.fromString('admin.$cmd'); + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; + + private sessions: Array; + + constructor(sessions: Array) { + super(); + this.sessions = sessions; + } + + override buildCommand(_connection: Connection, _session?: ClientSession): Document { + return { + endSessions: this.sessions + }; + } + + override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions { + return { + timeoutContext, + readPreference: ReadPreference.primaryPreferred, + noResponse: true + }; + } + + override get commandName(): string { + return 'endSessions'; + } +} diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 24367eac1c8..eca944afb3a 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -1,11 +1,8 @@ /* Specification prose tests */ -import { type ChildProcess, spawn } from 'node:child_process'; import { Readable } from 'node:stream'; import { expect } from 'chai'; -import * as os from 'os'; -import * as path from 'path'; import * as semver from 'semver'; import * as sinon from 'sinon'; import { pipeline } from 'stream/promises'; @@ -27,6 +24,7 @@ import { import { clearFailPoint, configureFailPoint, + configureMongocryptdSpawnHooks, type FailCommandFailPoint, makeMultiBatchWrite, measureDuration @@ -127,20 +125,9 @@ describe('CSOT spec prose tests', function () { let client: MongoClient; const mongocryptdTestPort = '23000'; - let childProcess: ChildProcess; + configureMongocryptdSpawnHooks({ port: mongocryptdTestPort }); beforeEach(async function () { - const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString()); - childProcess = spawn( - 'mongocryptd', - ['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile], - { - stdio: 'ignore', - detached: true - } - ); - - childProcess.on('error', error => console.warn(this.currentTest?.fullTitle(), error)); client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}/?timeoutMS=1000`, { monitorCommands: true }); @@ -148,7 +135,6 @@ describe('CSOT spec prose tests', function () { afterEach(async function () { await client.close(); - childProcess.kill('SIGKILL'); sinon.restore(); }); diff --git a/test/integration/node-specific/client_close.test.ts b/test/integration/node-specific/client_close.test.ts index 6fea572de6c..b0179aab60e 100644 --- a/test/integration/node-specific/client_close.test.ts +++ b/test/integration/node-specific/client_close.test.ts @@ -3,7 +3,14 @@ import * as events from 'node:events'; import { expect } from 'chai'; import { getCSFLEKMSProviders } from '../../csfle-kms-providers'; -import { type Collection, type FindCursor, type MongoClient } from '../../mongodb'; +import { + type Collection, + type CommandStartedEvent, + type FindCursor, + type MongoClient +} from '../../mongodb'; +import { configureMongocryptdSpawnHooks } from '../../tools/utils'; +import { filterForCommands } from '../shared'; import { runScriptAndGetProcessInfo } from './resource_tracking_script_builder'; describe('MongoClient.close() Integration', () => { @@ -490,20 +497,49 @@ describe('MongoClient.close() Integration', () => { }); describe('when MongoClient.close is called', function () { - it('sends an endSessions command', async function () { - await client.db('a').collection('a').insertOne({ a: 1 }); - await client.db('a').collection('a').insertOne({ a: 1 }); - await client.db('a').collection('a').insertOne({ a: 1 }); - const endSessionsStarted = events.once(client, 'commandStarted'); - const willEndSessions = events.once(client, 'commandSucceeded'); + describe('when sessions are supported', function () { + it('sends an endSessions command', async function () { + await client.db('a').collection('a').insertOne({ a: 1 }); + await client.db('a').collection('a').insertOne({ a: 1 }); + await client.db('a').collection('a').insertOne({ a: 1 }); + const endSessionsStarted = events.once(client, 'commandStarted'); + const willEndSessions = events.once(client, 'commandSucceeded'); - await client.close(); + await client.close(); + + const [startedEv] = await endSessionsStarted; + expect(startedEv).to.have.nested.property('command.endSessions').that.has.lengthOf(1); - const [startedEv] = await endSessionsStarted; - expect(startedEv).to.have.nested.property('command.endSessions').that.has.lengthOf(1); + const [commandEv] = await willEndSessions; + expect(commandEv).to.have.property('commandName', 'endSessions'); + }); + }); - const [commandEv] = await willEndSessions; - expect(commandEv).to.have.property('commandName', 'endSessions'); + describe('when sessions are not supported', function () { + const mongocryptdTestPort = '27022'; + let client: MongoClient; + const commands: Array = []; + + configureMongocryptdSpawnHooks({ port: mongocryptdTestPort }); + + beforeEach('configure cryptd client and prepopulate session pool', async function () { + client = this.configuration.newClient(`mongodb://localhost:${mongocryptdTestPort}`, { + monitorCommands: true + }); + + client.on('commandStarted', filterForCommands('endSessions', commands)); + + // run an operation to instantiate an implicit session (which should be omitted) from the + // actual command but still instantiated by the client. See session prose test 18. + await client.db().command({ hello: true }); + expect(client.s.sessionPool.sessions).to.have.length.greaterThan(0); + }); + + it('does not execute endSessions', async function () { + await client.close(); + + expect(commands).to.deep.equal([]); + }); }); }); }); diff --git a/test/integration/server-discovery-and-monitoring/server_description.test.ts b/test/integration/server-discovery-and-monitoring/server_description.test.ts index 60aa4614055..0bc9112c4b0 100644 --- a/test/integration/server-discovery-and-monitoring/server_description.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_description.test.ts @@ -1,35 +1,20 @@ -import { type ChildProcess, spawn } from 'node:child_process'; - import { expect } from 'chai'; -import * as os from 'os'; -import * as path from 'path'; -import { MongoClient, ObjectId } from '../../mongodb'; +import { MongoClient } from '../../mongodb'; +import { configureMongocryptdSpawnHooks } from '../../tools/utils'; describe('class ServerDescription', function () { describe('when connecting to mongocryptd', { requires: { mongodb: '>=4.4' } }, function () { let client: MongoClient; - const mongocryptdTestPort = '27022'; - let childProcess: ChildProcess; + + const { port: mongocryptdTestPort } = configureMongocryptdSpawnHooks(); beforeEach(async function () { - const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString()); - childProcess = spawn( - 'mongocryptd', - ['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile], - { - stdio: 'ignore', - detached: true - } - ); - - childProcess.on('error', error => console.warn(this.currentTest?.fullTitle(), error)); client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}`); }); afterEach(async function () { await client?.close(); - childProcess.kill('SIGKILL'); }); it('iscryptd is set to true ', async function () { diff --git a/test/integration/sessions/sessions.prose.test.ts b/test/integration/sessions/sessions.prose.test.ts index 3ee78cdf699..824430109f3 100644 --- a/test/integration/sessions/sessions.prose.test.ts +++ b/test/integration/sessions/sessions.prose.test.ts @@ -1,15 +1,11 @@ -import { ObjectId } from 'bson'; import { expect } from 'chai'; -import { type ChildProcess, spawn } from 'child_process'; import { once } from 'events'; -import * as os from 'os'; -import * as path from 'path'; import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; import { type Collection } from '../../../src/collection'; import { MongoDriverError, MongoInvalidArgumentError } from '../../../src/error'; import { MongoClient } from '../../../src/mongo_client'; -import { sleep } from '../../tools/utils'; +import { configureMongocryptdSpawnHooks, sleep } from '../../tools/utils'; describe('Sessions Prose Tests', () => { describe('5. Session argument is for the right client', () => { @@ -128,23 +124,8 @@ describe('Sessions Prose Tests', () => { */ const mongocryptdTestPort = '27022'; let client: MongoClient; - let childProcess: ChildProcess; - - before(() => { - const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString()); - childProcess = spawn( - 'mongocryptd', - ['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile], - { - stdio: 'ignore', - detached: true - } - ); - childProcess.on('error', err => { - console.warn('Sessions prose mongocryptd error:', err); - }); - }); + configureMongocryptdSpawnHooks({ port: mongocryptdTestPort }); beforeEach(async () => { client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}`, { @@ -160,10 +141,6 @@ describe('Sessions Prose Tests', () => { await client?.close(); }); - after(() => { - childProcess.kill(); - }); - it( '18. Implicit session is ignored if connection does not support sessions', { diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 79d918a6897..13d0be0ed47 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,6 +1,7 @@ import * as child_process from 'node:child_process'; import { on, once } from 'node:events'; import * as fs from 'node:fs/promises'; +import { tmpdir } from 'node:os'; import * as path from 'node:path'; import { EJSON } from 'bson'; @@ -528,3 +529,33 @@ export const DOMException: { ac.abort(); return ac.signal.reason.constructor; })(); + +export function configureMongocryptdSpawnHooks( + options: { port?: string; pidfilepath?: string } = {} +): { port: string } { + const port = options.port ?? '27022'; + const pidfilepath = options.pidfilepath ?? path.join(tmpdir(), new BSON.ObjectId().toHexString()); + + let childProcess: child_process.ChildProcess; + + beforeEach(async function () { + childProcess = child_process.spawn( + 'mongocryptd', + ['--port', port, '--ipv6', '--pidfilepath', pidfilepath], + { + stdio: 'ignore', + detached: false + } + ); + + childProcess.on('error', error => console.warn(this.currentTest?.fullTitle(), error)); + }); + + afterEach(function () { + childProcess.kill('SIGKILL'); + }); + + return { + port + }; +}