Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { promises as fs } from 'fs';
import type { TcpNetConnectOpts } from 'net';
import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls';

import { type ServerCommandOptions, type TimeoutContext } from '.';
import { TopologyType } from '.';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson';
import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream';
import type { AutoEncrypter, AutoEncryptionOptions } from './client-side-encryption/auto_encrypter';
Expand All @@ -22,7 +22,6 @@ import {
makeClientMetadata
} from './cmap/handshake/client_metadata';
import type { CompressorName } from './cmap/wire_protocol/compression';
import { MongoDBResponse } from './cmap/wire_protocol/responses';
import { parseOptions, resolveSRVRecord } from './connection_string';
import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
Expand All @@ -44,8 +43,8 @@ import {
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { EndSessionsOperation } from './operations/end_sessions';
import { executeOperation } from './operations/execute_operation';
import { AbstractOperation } from './operations/operation';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
Expand All @@ -63,7 +62,7 @@ import {
type HostAddress,
hostMatchesWildcards,
isHostMatch,
MongoDBNamespace,
type MongoDBNamespace,
noop,
ns,
resolveOptions,
Expand Down Expand Up @@ -795,40 +794,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return;
}

// If we would attempt to select a server and get nothing back we short circuit
// to avoid the server selection timeout.
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
const topologyDescription = this.topology.description;
const serverDescriptions = Array.from(topologyDescription.servers.values());
const servers = selector(topologyDescription, serverDescriptions);
if (servers.length !== 0) {
const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length !== 0) {
try {
class EndSessionsOperation extends AbstractOperation<void> {
override ns = MongoDBNamespace.fromString('admin.$cmd');
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
override buildCommand(_connection: Connection, _session?: ClientSession): Document {
return {
endSessions
};
}
override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions {
return {
timeoutContext,
readPreference: ReadPreference.primaryPreferred,
noResponse: true
};
}
override get commandName(): string {
return 'endSessions';
}
}
await executeOperation(this, new EndSessionsOperation());
} catch (error) {
squashError(error);
}
}
const supportsSessions =
this.topology.description.type === TopologyType.LoadBalanced ||
this.topology.description.logicalSessionTimeoutMinutes != null;

if (supportsSessions) {
await endSessions(this, this.topology);
}

// clear out references to old topology
Expand All @@ -841,6 +812,27 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
if (encrypter) {
await encrypter.close(this);
}

async function endSessions(
client: MongoClient,
{ description: topologyDescription }: Topology
) {
// If we would attempt to select a server and get nothing back we short circuit
// to avoid the server selection timeout.
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
const serverDescriptions = Array.from(topologyDescription.servers.values());
const servers = selector(topologyDescription, serverDescriptions);
if (servers.length !== 0) {
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length !== 0) {
try {
await executeOperation(client, new EndSessionsOperation(endSessions));
} catch (error) {
squashError(error);
}
}
}
}
}

/**
Expand Down
42 changes: 42 additions & 0 deletions src/operations/end_sessions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import {
type ClientSession,
type Connection,
type ServerCommandOptions,
type ServerSessionId,
type TimeoutContext
} from '..';
import { type Document } from '../bson';
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
import { ReadPreference } from '../read_preference';
import { MongoDBNamespace } from '../utils';
import { AbstractOperation } from './operation';

export class EndSessionsOperation extends AbstractOperation<void> {
override ns = MongoDBNamespace.fromString('admin.$cmd');
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;

private sessions: Array<ServerSessionId>;

constructor(sessions: Array<ServerSessionId>) {
super();
this.sessions = sessions;
}

override buildCommand(_connection: Connection, _session?: ClientSession): Document {
return {
endSessions: this.sessions
};
}

override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions {
return {
timeoutContext,
readPreference: ReadPreference.primaryPreferred,
noResponse: true
};
}

override get commandName(): string {
return 'endSessions';
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
/* Specification prose tests */

import { type ChildProcess, spawn } from 'node:child_process';
import { Readable } from 'node:stream';

import { expect } from 'chai';
import * as os from 'os';
import * as path from 'path';
import * as semver from 'semver';
import * as sinon from 'sinon';
import { pipeline } from 'stream/promises';
Expand All @@ -27,6 +24,7 @@ import {
import {
clearFailPoint,
configureFailPoint,
configureMongocryptdSpawnHooks,
type FailCommandFailPoint,
makeMultiBatchWrite,
measureDuration
Expand Down Expand Up @@ -127,28 +125,16 @@ describe('CSOT spec prose tests', function () {

let client: MongoClient;
const mongocryptdTestPort = '23000';
let childProcess: ChildProcess;
configureMongocryptdSpawnHooks({ port: mongocryptdTestPort });

beforeEach(async function () {
const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString());
childProcess = spawn(
'mongocryptd',
['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile],
{
stdio: 'ignore',
detached: true
}
);

childProcess.on('error', error => console.warn(this.currentTest?.fullTitle(), error));
client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}/?timeoutMS=1000`, {
monitorCommands: true
});
});

afterEach(async function () {
await client.close();
childProcess.kill('SIGKILL');
sinon.restore();
});

Expand Down
60 changes: 48 additions & 12 deletions test/integration/node-specific/client_close.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import * as events from 'node:events';
import { expect } from 'chai';

import { getCSFLEKMSProviders } from '../../csfle-kms-providers';
import { type Collection, type FindCursor, type MongoClient } from '../../mongodb';
import {
type Collection,
type CommandStartedEvent,
type FindCursor,
type MongoClient
} from '../../mongodb';
import { configureMongocryptdSpawnHooks } from '../../tools/utils';
import { filterForCommands } from '../shared';
import { runScriptAndGetProcessInfo } from './resource_tracking_script_builder';

describe('MongoClient.close() Integration', () => {
Expand Down Expand Up @@ -490,20 +497,49 @@ describe('MongoClient.close() Integration', () => {
});

describe('when MongoClient.close is called', function () {
it('sends an endSessions command', async function () {
await client.db('a').collection('a').insertOne({ a: 1 });
await client.db('a').collection('a').insertOne({ a: 1 });
await client.db('a').collection('a').insertOne({ a: 1 });
const endSessionsStarted = events.once(client, 'commandStarted');
const willEndSessions = events.once(client, 'commandSucceeded');
describe('when sessions are supported', function () {
it('sends an endSessions command', async function () {
await client.db('a').collection('a').insertOne({ a: 1 });
await client.db('a').collection('a').insertOne({ a: 1 });
await client.db('a').collection('a').insertOne({ a: 1 });
const endSessionsStarted = events.once(client, 'commandStarted');
const willEndSessions = events.once(client, 'commandSucceeded');

await client.close();
await client.close();

const [startedEv] = await endSessionsStarted;
expect(startedEv).to.have.nested.property('command.endSessions').that.has.lengthOf(1);

const [startedEv] = await endSessionsStarted;
expect(startedEv).to.have.nested.property('command.endSessions').that.has.lengthOf(1);
const [commandEv] = await willEndSessions;
expect(commandEv).to.have.property('commandName', 'endSessions');
});
});

const [commandEv] = await willEndSessions;
expect(commandEv).to.have.property('commandName', 'endSessions');
describe('when sessions are not supported', function () {
const mongocryptdTestPort = '27022';
let client: MongoClient;
const commands: Array<CommandStartedEvent> = [];

configureMongocryptdSpawnHooks({ port: mongocryptdTestPort });

beforeEach('configure cryptd client and prepopulate session pool', async function () {
client = this.configuration.newClient(`mongodb://localhost:${mongocryptdTestPort}`, {
monitorCommands: true
});

client.on('commandStarted', filterForCommands('endSessions', commands));

// run an operation to instantiate an implicit session (which should be omitted) from the
// actual command but still instantiated by the client. See session prose test 18.
await client.db().command({ hello: true });
expect(client.s.sessionPool.sessions).to.have.length.greaterThan(0);
});

it('does not execute endSessions', async function () {
await client.close();

expect(commands).to.deep.equal([]);
});
});
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
import { type ChildProcess, spawn } from 'node:child_process';

import { expect } from 'chai';
import * as os from 'os';
import * as path from 'path';

import { MongoClient, ObjectId } from '../../mongodb';
import { MongoClient } from '../../mongodb';
import { configureMongocryptdSpawnHooks } from '../../tools/utils';

describe('class ServerDescription', function () {
describe('when connecting to mongocryptd', { requires: { mongodb: '>=4.4' } }, function () {
let client: MongoClient;
const mongocryptdTestPort = '27022';
let childProcess: ChildProcess;

const { port: mongocryptdTestPort } = configureMongocryptdSpawnHooks();

beforeEach(async function () {
const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString());
childProcess = spawn(
'mongocryptd',
['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile],
{
stdio: 'ignore',
detached: true
}
);

childProcess.on('error', error => console.warn(this.currentTest?.fullTitle(), error));
client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}`);
});

afterEach(async function () {
await client?.close();
childProcess.kill('SIGKILL');
});

it('iscryptd is set to true ', async function () {
Expand Down
27 changes: 2 additions & 25 deletions test/integration/sessions/sessions.prose.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { ObjectId } from 'bson';
import { expect } from 'chai';
import { type ChildProcess, spawn } from 'child_process';
import { once } from 'events';
import * as os from 'os';
import * as path from 'path';

import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
import { type Collection } from '../../../src/collection';
import { MongoDriverError, MongoInvalidArgumentError } from '../../../src/error';
import { MongoClient } from '../../../src/mongo_client';
import { sleep } from '../../tools/utils';
import { configureMongocryptdSpawnHooks, sleep } from '../../tools/utils';

describe('Sessions Prose Tests', () => {
describe('5. Session argument is for the right client', () => {
Expand Down Expand Up @@ -128,23 +124,8 @@ describe('Sessions Prose Tests', () => {
*/
const mongocryptdTestPort = '27022';
let client: MongoClient;
let childProcess: ChildProcess;

before(() => {
const pidFile = path.join(os.tmpdir(), new ObjectId().toHexString());
childProcess = spawn(
'mongocryptd',
['--port', mongocryptdTestPort, '--ipv6', '--pidfilepath', pidFile],
{
stdio: 'ignore',
detached: true
}
);

childProcess.on('error', err => {
console.warn('Sessions prose mongocryptd error:', err);
});
});
configureMongocryptdSpawnHooks({ port: mongocryptdTestPort });

beforeEach(async () => {
client = new MongoClient(`mongodb://localhost:${mongocryptdTestPort}`, {
Expand All @@ -160,10 +141,6 @@ describe('Sessions Prose Tests', () => {
await client?.close();
});

after(() => {
childProcess.kill();
});

it(
'18. Implicit session is ignored if connection does not support sessions',
{
Expand Down
Loading