Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/apm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class Instrumentation extends EventEmitter {
const instrumentation = this;
mongoClientClass.prototype.connect = function (this: MongoClient, callback: Callback) {
// override monitorCommands to be switched on
this.s.options = { ...(this.s.options ?? {}), monitorCommands: true };
this.monitorCommands = true;

this.on(Connection.COMMAND_STARTED, event =>
instrumentation.emit(Instrumentation.STARTED, event)
Expand Down
51 changes: 34 additions & 17 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ export type WithSessionCallback = (session: ClientSession) => Promise<any> | voi
/** @internal */
export interface MongoClientPrivate {
url: string;
options?: MongoClientOptions;
sessions: Set<ClientSession>;
readConcern?: ReadConcern;
writeConcern?: WriteConcern;
readPreference: ReadPreference;
bsonOptions: BSONSerializeOptions;
namespace: MongoDBNamespace;
logger: Logger;
readonly options?: MongoOptions;
readonly readConcern?: ReadConcern;
readonly writeConcern?: WriteConcern;
readonly readPreference: ReadPreference;
readonly logger: Logger;
}

const kOptions = Symbol('options');
Expand Down Expand Up @@ -293,29 +293,36 @@ export class MongoClient extends EventEmitter {
*/
[kOptions]: MongoOptions;

// debugging
originalUri;
originalOptions;

constructor(url: string, options?: MongoClientOptions) {
super();

this.originalUri = url;
this.originalOptions = options;

this[kOptions] = parseOptions(url, this, options);

// eslint-disable-next-line @typescript-eslint/no-this-alias
const client = this;

// The internal state
this.s = {
url,
options: this[kOptions],
sessions: new Set(),
readConcern: this[kOptions].readConcern,
writeConcern: this[kOptions].writeConcern,
readPreference: this[kOptions].readPreference,
bsonOptions: resolveBSONOptions(this[kOptions]),
namespace: ns('admin'),
logger: this[kOptions].logger

get options() {
return client[kOptions];
},
get readConcern() {
return client[kOptions].readConcern;
},
get writeConcern() {
return client[kOptions].writeConcern;
},
get readPreference() {
return client[kOptions].readPreference;
},
get logger() {
return client[kOptions].logger;
}
};
}

Expand All @@ -326,6 +333,16 @@ export class MongoClient extends EventEmitter {
get serverApi(): Readonly<ServerApi | undefined> {
return this[kOptions].serverApi && Object.freeze({ ...this[kOptions].serverApi });
}
/**
* Intended for APM use only
* @internal
*/
get monitorCommands(): boolean {
return this[kOptions].monitorCommands;
}
set monitorCommands(value: boolean) {
this[kOptions].monitorCommands = value;
}

get autoEncrypter(): AutoEncrypter | undefined {
return this[kOptions].autoEncrypter;
Expand Down
4 changes: 2 additions & 2 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import type { MongoCredentials } from '../cmap/auth/mongo_credentials';
import type { Transaction } from '../transactions';
import type { CloseOptions } from '../cmap/connection_pool';
import { DestroyOptions, Connection } from '../cmap/connection';
import type { MongoClientOptions, ServerApi } from '../mongo_client';
import type { MongoOptions, ServerApi } from '../mongo_client';
import { DEFAULT_OPTIONS } from '../connection_string';
import { serialize, deserialize } from '../bson';

Expand Down Expand Up @@ -566,7 +566,7 @@ export class Topology extends EventEmitter {
}

/** Start a logical session */
startSession(options: ClientSessionOptions, clientOptions?: MongoClientOptions): ClientSession {
startSession(options: ClientSessionOptions, clientOptions?: MongoOptions): ClientSession {
const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
session.once('ended', () => {
this.s.sessions.delete(session);
Expand Down
30 changes: 20 additions & 10 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
maybePromise
} from './utils';
import type { Topology } from './sdam/topology';
import type { MongoClientOptions } from './mongo_client';
import type { MongoOptions } from './mongo_client';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { AbstractCursor } from './cursor/abstract_cursor';
Expand Down Expand Up @@ -76,7 +76,7 @@ class ClientSession extends EventEmitter {
/** @internal */
sessionPool: ServerSessionPool;
hasEnded: boolean;
clientOptions?: MongoClientOptions;
clientOptions?: MongoOptions;
supports: { causalConsistency: boolean };
clusterTime?: ClusterTime;
operationTime?: Timestamp;
Expand All @@ -98,7 +98,7 @@ class ClientSession extends EventEmitter {
topology: Topology,
sessionPool: ServerSessionPool,
options: ClientSessionOptions,
clientOptions?: MongoClientOptions
clientOptions?: MongoOptions
) {
super();

Expand All @@ -111,7 +111,6 @@ class ClientSession extends EventEmitter {
}

options = options ?? {};
clientOptions = clientOptions || {};

this.topology = topology;
this.sessionPool = sessionPool;
Expand Down Expand Up @@ -263,11 +262,22 @@ class ClientSession extends EventEmitter {

// increment txnNumber
this.incrementTransactionNumber();

// create transaction state
this.transaction = new Transaction(
Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
);
this.transaction = new Transaction({
readConcern:
options?.readConcern ??
this.defaultTransactionOptions.readConcern ??
this.clientOptions?.readConcern,
writeConcern:
options?.writeConcern ??
this.defaultTransactionOptions.writeConcern ??
this.clientOptions?.writeConcern,
readPreference:
options?.readPreference ??
this.defaultTransactionOptions.readPreference ??
this.clientOptions?.readPreference,
maxCommitTimeMS: options?.maxCommitTimeMS ?? this.defaultTransactionOptions.maxCommitTimeMS
});

this.transaction.transition(TxnState.STARTING_TRANSACTION);
}
Expand Down Expand Up @@ -503,8 +513,8 @@ function endTransaction(session: ClientSession, commandName: string, callback: C
let writeConcern;
if (session.transaction.options.writeConcern) {
writeConcern = Object.assign({}, session.transaction.options.writeConcern);
} else if (session.clientOptions && session.clientOptions.w) {
writeConcern = { w: session.clientOptions.w };
} else if (session.clientOptions && session.clientOptions.writeConcern) {
writeConcern = { w: session.clientOptions.writeConcern.w };
}

if (txnState === TxnState.TRANSACTION_COMMITTED) {
Expand Down
85 changes: 60 additions & 25 deletions test/functional/unified-spec-runner/entities.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { MongoClient, Db, Collection, GridFSBucket, Document } from '../../../src/index';
import {
MongoClient,
Db,
Collection,
GridFSBucket,
Document,
HostAddress
} from '../../../src/index';
import { ReadConcern } from '../../../src/read_concern';
import { WriteConcern } from '../../../src/write_concern';
import { ReadPreference } from '../../../src/read_preference';
Expand All @@ -18,10 +25,6 @@ interface UnifiedChangeStream extends ChangeStream {
eventCollector: InstanceType<typeof import('../../tools/utils')['EventCollector']>;
}

interface UnifiedClientSession extends ClientSession {
client: UnifiedMongoClient;
}

export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent;

export class UnifiedMongoClient extends MongoClient {
Expand Down Expand Up @@ -75,23 +78,49 @@ export class UnifiedMongoClient extends MongoClient {
}
return this.events;
}
}

async enableFailPoint(failPoint: Document): Promise<Document> {
const admin = this.db().admin();
export class FailPointMap extends Map<string, Document> {
async enableFailPoint(
addressOrClient: HostAddress | UnifiedMongoClient,
failPoint: Document
): Promise<Document> {
let client: MongoClient;
let address: string;
if (addressOrClient instanceof MongoClient) {
client = addressOrClient;
address = client.topology.s.seedlist.join(',');
} else {
// create a new client
address = addressOrClient.toString();
client = new MongoClient(`mongodb://${address}`);
await client.connect();
}

const admin = client.db('admin');
const result = await admin.command(failPoint);

if (!(addressOrClient instanceof MongoClient)) {
// we created this client
await client.close();
}

expect(result).to.have.property('ok', 1);
this.failPoints.push(failPoint.configureFailPoint);
this.set(address, failPoint.configureFailPoint);
return result;
}

async disableFailPoints(): Promise<Document[]> {
return Promise.all(
this.failPoints.map(configureFailPoint =>
this.db().admin().command({
configureFailPoint,
mode: 'off'
})
)
async disableFailPoints(): Promise<void> {
const entries = Array.from(this.entries());
await Promise.all(
entries.map(async ([hostAddress, configureFailPoint]) => {
const client = new MongoClient(`mongodb://${hostAddress}`);
await client.connect();
const admin = client.db('admin');
const result = await admin.command({ configureFailPoint, mode: 'off' });
expect(result).to.have.property('ok', 1);
await client.close();
})
);
}
}
Expand All @@ -100,7 +129,7 @@ export type Entity =
| UnifiedMongoClient
| Db
| Collection
| UnifiedClientSession
| ClientSession
| UnifiedChangeStream
| GridFSBucket
| Document; // Results from operations
Expand All @@ -124,10 +153,17 @@ ENTITY_CTORS.set('bucket', GridFSBucket);
ENTITY_CTORS.set('stream', ChangeStream);

export class EntitiesMap<E = Entity> extends Map<string, E> {
failPoints: FailPointMap;

constructor(entries?: readonly (readonly [string, E])[] | null) {
super(entries);
this.failPoints = new FailPointMap();
}

mapOf(type: 'client'): EntitiesMap<UnifiedMongoClient>;
mapOf(type: 'db'): EntitiesMap<Db>;
mapOf(type: 'collection'): EntitiesMap<Collection>;
mapOf(type: 'session'): EntitiesMap<UnifiedClientSession>;
mapOf(type: 'session'): EntitiesMap<ClientSession>;
mapOf(type: 'bucket'): EntitiesMap<GridFSBucket>;
mapOf(type: 'stream'): EntitiesMap<UnifiedChangeStream>;
mapOf(type: EntityTypeId): EntitiesMap<Entity> {
Expand All @@ -141,7 +177,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
getEntity(type: 'client', key: string, assertExists?: boolean): UnifiedMongoClient;
getEntity(type: 'db', key: string, assertExists?: boolean): Db;
getEntity(type: 'collection', key: string, assertExists?: boolean): Collection;
getEntity(type: 'session', key: string, assertExists?: boolean): UnifiedClientSession;
getEntity(type: 'session', key: string, assertExists?: boolean): ClientSession;
getEntity(type: 'bucket', key: string, assertExists?: boolean): GridFSBucket;
getEntity(type: 'stream', key: string, assertExists?: boolean): UnifiedChangeStream;
getEntity(type: EntityTypeId, key: string, assertExists = true): Entity {
Expand All @@ -161,8 +197,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
}

async cleanup(): Promise<void> {
await this.failPoints.disableFailPoints();
for (const [, client] of this.mapOf('client')) {
await client.disableFailPoints();
await client.close();
}
for (const [, session] of this.mapOf('session')) {
Expand All @@ -178,7 +214,9 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
const map = new EntitiesMap();
for (const entity of entities ?? []) {
if ('client' in entity) {
const uri = config.url({ useMultipleMongoses: entity.client.useMultipleMongoses });
const useMultipleMongoses =
config.topologyType === 'Sharded' && entity.client.useMultipleMongoses;
const uri = config.url({ useMultipleMongoses });
const client = new UnifiedMongoClient(uri, entity.client);
await client.connect();
map.set(entity.client.id, client);
Expand Down Expand Up @@ -228,10 +266,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
}
}

const session = client.startSession(options) as UnifiedClientSession;
// targetedFailPoint operations need to access the client the session came from
session.client = client;

const session = client.startSession(options);
map.set(entity.session.id, session);
} else if ('bucket' in entity) {
const db = map.getEntity('db', entity.bucket.database);
Expand Down
10 changes: 6 additions & 4 deletions test/functional/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ operations.set('assertSessionNotDirty', async ({ entities, operation }) => {

operations.set('assertSessionPinned', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
expect(session.transaction.isPinned).to.be.false;
expect(session.transaction.isPinned).to.be.true;
});

operations.set('assertSessionUnpinned', async ({ entities, operation }) => {
Expand Down Expand Up @@ -249,7 +249,7 @@ operations.set('findOneAndUpdate', async ({ entities, operation }) => {

operations.set('failPoint', async ({ entities, operation }) => {
const client = entities.getEntity('client', operation.arguments.client);
return client.enableFailPoint(operation.arguments.failPoint);
return entities.failPoints.enableFailPoint(client, operation.arguments.failPoint);
});

operations.set('insertOne', async ({ entities, operation }) => {
Expand Down Expand Up @@ -309,8 +309,10 @@ operations.set('startTransaction', async ({ entities, operation }) => {
operations.set('targetedFailPoint', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true;
const client = session.client;
client.enableFailPoint(operation.arguments.failPoint);
await entities.failPoints.enableFailPoint(
session.transaction._pinnedServer.s.description.hostAddress,
operation.arguments.failPoint
);
});

operations.set('delete', async ({ entities, operation }) => {
Expand Down
Loading