From ef043b203819ed84806691078375cb0fe15759d3 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 28 Mar 2022 11:32:57 +0200 Subject: [PATCH 1/2] feat(data-service): use split CRUD client/Metadata client model COMPASS-5641 For CSFLE support, we will need Compass to be able to access both server metadata and encrypted collection contents. Using automatic encryption means that a number of administrative commands are not available. As a consequence, for CSFLE-enabled connections, Compass will maintain two separate `MongoClient` instances, and `DataService` methods will pick a specific one depending on what they do. For other connections, there is only a single client. --- .gitignore | 1 + .../compass-crud/src/stores/crud-store.js | 8 +- .../src/utils/cancellable-queries.spec.js | 2 +- .../src/utils/collection-stream.js | 2 +- .../src/connect-mongo-client.spec.ts | 84 ++++-- .../data-service/src/connect-mongo-client.ts | 67 +++-- .../data-service/src/data-service.spec.ts | 12 +- packages/data-service/src/data-service.ts | 266 +++++++++++------- 8 files changed, 283 insertions(+), 159 deletions(-) diff --git a/.gitignore b/.gitignore index c6a1d2b962a..17b4ac2945a 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ storage coverage .ackrc env-vars.sh +mongocryptd.pid diff --git a/packages/compass-crud/src/stores/crud-store.js b/packages/compass-crud/src/stores/crud-store.js index b75b9fe63d9..2399c1dcc31 100644 --- a/packages/compass-crud/src/stores/crud-store.js +++ b/packages/compass-crud/src/stores/crud-store.js @@ -611,7 +611,7 @@ const configureStore = (options = {}) => { } } - const session = this.dataService.startSession(); + const session = this.dataService.startSession('CRUD'); const abortController = new AbortController(); const signal = abortController.signal; @@ -1029,7 +1029,7 @@ const configureStore = (options = {}) => { const fetchShardingKeysOptions = { maxTimeMS: query.maxTimeMS, - session: this.dataService.startSession() + session: this.dataService.startSession('CRUD') }; const countOptions = { @@ -1037,7 +1037,7 @@ const configureStore = (options = {}) => { maxTimeMS: query.maxTimeMS > COUNT_MAX_TIME_MS_CAP ? COUNT_MAX_TIME_MS_CAP : query.maxTimeMS, - session: this.dataService.startSession() + session: this.dataService.startSession('CRUD') }; if (this.isCountHintSafe()) { @@ -1053,7 +1053,7 @@ const configureStore = (options = {}) => { maxTimeMS: query.maxTimeMS, promoteValues: false, bsonRegExp: true, - session: this.dataService.startSession() + session: this.dataService.startSession('CRUD') }; // only set limit if it's > 0, read-only views cannot handle 0 limit. diff --git a/packages/compass-crud/src/utils/cancellable-queries.spec.js b/packages/compass-crud/src/utils/cancellable-queries.spec.js index 1a8f949aa31..143a03bcba7 100644 --- a/packages/compass-crud/src/utils/cancellable-queries.spec.js +++ b/packages/compass-crud/src/utils/cancellable-queries.spec.js @@ -76,7 +76,7 @@ describe('cancellable-queries', function() { beforeEach(function() { sinon.restore(); - session = dataService.startSession(); + session = dataService.startSession('CRUD'); abortController = new AbortController(); signal = abortController.signal; }); diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index 3358b8290e0..8a21cff9508 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -42,7 +42,7 @@ class WritableCollectionStream extends Writable { } _collection() { - return this.dataService._collection(this.ns); + return this.dataService._collection(this.ns, 'CRUD'); } _write(document, _encoding, next) { diff --git a/packages/data-service/src/connect-mongo-client.spec.ts b/packages/data-service/src/connect-mongo-client.spec.ts index 1ddd8337141..00c7ae304c4 100644 --- a/packages/data-service/src/connect-mongo-client.spec.ts +++ b/packages/data-service/src/connect-mongo-client.spec.ts @@ -10,7 +10,11 @@ const setupListeners = () => { }; describe('connectMongoClient', function () { - let toBeClosed: { close: () => Promise }[] = []; + const toBeClosed = new Set<{ close: () => Promise }>(); + + beforeEach(function () { + toBeClosed.clear(); + }); afterEach(async function () { for (const mongoClientOrTunnel of toBeClosed) { @@ -18,8 +22,6 @@ describe('connectMongoClient', function () { await mongoClientOrTunnel.close(); } } - - toBeClosed = []; }); describe('local', function () { @@ -34,32 +36,77 @@ describe('connectMongoClient', function () { }); it('should return connection config when connected successfully', async function () { - const [client, tunnel, { url, options }] = await connectMongoClient( - { - connectionString: 'mongodb://localhost:27018', + const [metadataClient, crudClient, tunnel, { url, options }] = + await connectMongoClient( + { + connectionString: 'mongodb://localhost:27018', + }, + setupListeners + ); + + for (const closeLater of [metadataClient, crudClient, tunnel]) { + toBeClosed.add(closeLater); + } + + expect(metadataClient).to.equal(crudClient); + expect(url).to.equal('mongodb://localhost:27018'); + + expect(options).to.deep.equal({ + monitorCommands: true, + useSystemCA: undefined, + autoEncryption: undefined, + }); + }); + + it('should return two different clients when AutoEncryption is enabled', async function () { + const autoEncryption = { + keyVaultNamespace: 'encryption.__keyVault', + kmsProviders: { + local: { key: Buffer.alloc(96) }, }, - setupListeners - ); + bypassAutoEncryption: true, + }; + const [metadataClient, crudClient, tunnel, { url, options }] = + await connectMongoClient( + { + connectionString: 'mongodb://localhost:27018', + fleOptions: { + storeCredentials: false, + autoEncryption, + }, + }, + setupListeners + ); - toBeClosed.push(client, tunnel); + for (const closeLater of [metadataClient, crudClient, tunnel]) { + toBeClosed.add(closeLater); + } - assert.strictEqual(url, 'mongodb://localhost:27018'); + expect(metadataClient).to.not.equal(crudClient); + expect(metadataClient.options.autoEncryption).to.equal(undefined); + expect(crudClient.options.autoEncryption).to.be.an('object'); + expect(url).to.equal('mongodb://localhost:27018'); - assert.deepStrictEqual(options, { + expect(options).to.deep.equal({ monitorCommands: true, useSystemCA: undefined, + autoEncryption, }); }); it('should not override a user-specified directConnection option', async function () { - const [client, tunnel, { url, options }] = await connectMongoClient( - { - connectionString: 'mongodb://localhost:27018/?directConnection=false', - }, - setupListeners - ); + const [metadataClient, crudClient, tunnel, { url, options }] = + await connectMongoClient( + { + connectionString: + 'mongodb://localhost:27018/?directConnection=false', + }, + setupListeners + ); - toBeClosed.push(client, tunnel); + for (const closeLater of [metadataClient, crudClient, tunnel]) { + toBeClosed.add(closeLater); + } assert.strictEqual( url, @@ -69,6 +116,7 @@ describe('connectMongoClient', function () { assert.deepStrictEqual(options, { monitorCommands: true, useSystemCA: undefined, + autoEncryption: undefined, }); }); diff --git a/packages/data-service/src/connect-mongo-client.ts b/packages/data-service/src/connect-mongo-client.ts index f27e2b5be06..1e4d10cfc00 100644 --- a/packages/data-service/src/connect-mongo-client.ts +++ b/packages/data-service/src/connect-mongo-client.ts @@ -5,6 +5,7 @@ import type { DevtoolsConnectOptions } from '@mongodb-js/devtools-connect'; import type SSHTunnel from '@mongodb-js/ssh-tunnel'; import EventEmitter from 'events'; import { redactConnectionOptions, redactConnectionString } from './redact'; +import _ from 'lodash'; import createLoggerAndTelemetry from '@mongodb-js/compass-logging'; import type { ConnectionOptions } from './connection-options'; @@ -21,9 +22,10 @@ export default async function connectMongoClientCompass( setupListeners: (client: MongoClient) => void ): Promise< [ - MongoClient, - SSHTunnel | undefined, - { url: string; options: MongoClientOptions } + metadataClient: MongoClient, + crudClient: MongoClient, + sshTunnel: SSHTunnel | undefined, + options: { url: string; options: DevtoolsConnectOptions } ] > { debug( @@ -35,6 +37,7 @@ export default async function connectMongoClientCompass( const options: DevtoolsConnectOptions = { monitorCommands: true, useSystemCA: connectionOptions.useSystemCA, + autoEncryption: connectionOptions.fleOptions?.autoEncryption, }; // If connectionOptions.sshTunnel is defined, open an ssh tunnel. @@ -61,32 +64,54 @@ export default async function connectMongoClientCompass( const connectLogger = new EventEmitter(); hookLogger(connectLogger, log.unbound, 'compass', redactConnectionString); - let mongoClient: MongoClient | undefined; + async function connectSingleClient( + overrideOptions: DevtoolsConnectOptions + ): Promise { + const client = await connectMongoClient( + url, + // Deep clone because of https://jira.mongodb.org/browse/NODE-4124, + // the options here are being mutated. + _.cloneDeep({ ...options, ...overrideOptions }), + connectLogger, + CompassMongoClient + ); + await client.db('admin').command({ ping: 1 }); + return client; + } + + let metadataClient: MongoClient | undefined; + let crudClient: MongoClient | undefined; try { debug('waiting for MongoClient to connect ...'); - mongoClient = await Promise.race([ - (async () => { - const mongoClient = await connectMongoClient( - url, - options, - connectLogger, - CompassMongoClient - ); - await mongoClient.db('admin').command({ ping: 1 }); - return mongoClient; - })(), + // Create one or two clients, depending on whether CSFLE + // is enabled. If it is, create one for interacting with + // server metadata (e.g. build info, instance data, etc.) + // and one for interacting with the actual CRUD data. + // If CSFLE is disabled, use a single client for both cases. + [metadataClient, crudClient] = await Promise.race([ + Promise.all([ + connectSingleClient({ autoEncryption: undefined }), + options.autoEncryption ? connectSingleClient({}) : undefined, + ]), waitForTunnelError(tunnel), ]); // waitForTunnel always throws, never resolves - return [mongoClient, tunnel, { url, options }]; + return [ + metadataClient, + crudClient ?? metadataClient, + tunnel, + { url, options }, + ]; } catch (err: any) { debug('connection error', err); debug('force shutting down ssh tunnel ...'); - await Promise.all([forceCloseTunnel(tunnel), mongoClient?.close()]).catch( - () => { - /* ignore errors */ - } - ); + await Promise.all([ + forceCloseTunnel(tunnel), + crudClient?.close(), + metadataClient?.close(), + ]).catch(() => { + /* ignore errors */ + }); throw err; } } diff --git a/packages/data-service/src/data-service.spec.ts b/packages/data-service/src/data-service.spec.ts index b442dd549a7..371c5831bd9 100644 --- a/packages/data-service/src/data-service.spec.ts +++ b/packages/data-service/src/data-service.spec.ts @@ -1311,7 +1311,7 @@ describe('DataService', function () { describe('#startSession', function () { it('returns a new client session', function () { - const session = dataService.startSession(); + const session = dataService.startSession('CRUD'); expect(session.constructor.name).to.equal('ClientSession'); // used by killSessions, must be a bson UUID in order to work @@ -1322,14 +1322,14 @@ describe('DataService', function () { describe('#killSessions', function () { it('does not throw if kill a non existing session', async function () { - const session = dataService.startSession(); + const session = dataService.startSession('CRUD'); await dataService.killSessions(session); }); it('kills a command with a session', async function () { const commandSpy = sinon.spy(); sandbox.replace( - (dataService as any)._client, + (dataService as any)._crudClient, 'db', () => ({ @@ -1337,7 +1337,7 @@ describe('DataService', function () { } as any) ); - const session = dataService.startSession(); + const session = dataService.startSession('CRUD'); await dataService.killSessions(session); expect(commandSpy.args[0][0]).to.deep.equal({ @@ -1353,7 +1353,9 @@ describe('DataService', function () { const dataService = new DataService({ connectionString: 'mongodb://localhost:27020', }); - (dataService as any)._client = createMongoClientMock(clientConfig); + const client = createMongoClientMock(clientConfig); + (dataService as any)._crudClient = client; + (dataService as any)._metadataClient = client; return dataService; } diff --git a/packages/data-service/src/data-service.ts b/packages/data-service/src/data-service.ts index 0b1104a0671..27accceaf6f 100644 --- a/packages/data-service/src/data-service.ts +++ b/packages/data-service/src/data-service.ts @@ -16,7 +16,6 @@ import type { CountDocumentsOptions, CreateCollectionOptions, CreateIndexesOptions, - Db, DeleteOptions, DeleteResult, Document, @@ -90,15 +89,19 @@ function isEmptyObject(obj: Record) { let id = 0; +type ClientType = 'CRUD' | 'META'; +const kSessionClientType = Symbol('kSessionClientType'); + class DataService extends EventEmitter { - private readonly _connectionOptions: ConnectionOptions; + private readonly _connectionOptions: Readonly; private _isConnecting = false; private _mongoClientConnectionOptions?: { url: string; options: MongoClientOptions; }; - private _client?: MongoClient; + private _metadataClient?: MongoClient; + private _crudClient?: MongoClient; private _tunnel?: SshTunnel; /** @@ -111,7 +114,7 @@ class DataService extends EventEmitter { private _topologyType: TopologyType = 'Unknown'; private _id: number; - constructor(connectionOptions: ConnectionOptions) { + constructor(connectionOptions: Readonly) { super(); this._id = id++; this._connectionOptions = connectionOptions; @@ -136,7 +139,7 @@ class DataService extends EventEmitter { } getReadPreference(): ReadPreferenceLike { - return this._initializedClient.readPreference; + return this._initializedClient('CRUD').readPreference; } /** @@ -217,7 +220,11 @@ class DataService extends EventEmitter { 'Fetching collection info', { ns: `${databaseName}.${collectionName}` } ); - const db = this._initializedClient.db(databaseName); + // Note: The collStats command is not supported on CSFLE-enabled + // clients, but the $collStats aggregation stage is. + // When we're doing https://jira.mongodb.org/browse/COMPASS-5583, + // we can switch this over to using the CRUD client instead. + const db = this._initializedClient('META').db(databaseName); db.command({ collStats: collectionName, verbose: true }, (error, data) => { logop(error); if (error && !error.message.includes('is a view, not a collection')) { @@ -262,7 +269,7 @@ class DataService extends EventEmitter { comm: Document, callback: Callback ): void { - const db = this._initializedClient.db(databaseName); + const db = this._initializedClient('META').db(databaseName); db.command(comm, (error, result) => { if (error) { // @ts-expect-error Callback without result... @@ -306,7 +313,7 @@ class DataService extends EventEmitter { 'Running connectionStatus' ); try { - const adminDb = this._initializedClient.db('admin'); + const adminDb = this._initializedClient('META').db('admin'); const result = await runCommand(adminDb, { connectionStatus: 1, showPrivileges: true, @@ -355,7 +362,7 @@ class DataService extends EventEmitter { { db: databaseName, nameOnly: nameOnly ?? false } ); - const db = this._initializedClient.db(databaseName); + const db = this._initializedClient('CRUD').db(databaseName); const listCollections = async () => { try { @@ -443,7 +450,7 @@ class DataService extends EventEmitter { { nameOnly: nameOnly ?? false } ); - const adminDb = this._initializedClient.db('admin'); + const adminDb = this._initializedClient('CRUD').db('admin'); const listDatabases = async () => { try { @@ -514,7 +521,7 @@ class DataService extends EventEmitter { } async connect(): Promise { - if (this._client) { + if (this._metadataClient) { debug('already connected'); return; } @@ -532,10 +539,11 @@ class DataService extends EventEmitter { }); try { - const [client, tunnel, connectionOptions] = await connectMongoClient( - this._connectionOptions, - this.setupListeners.bind(this) - ); + const [metadataClient, crudClient, tunnel, connectionOptions] = + await connectMongoClient( + this._connectionOptions, + this.setupListeners.bind(this) + ); const attr = { isWritable: this.isWritable(), @@ -545,7 +553,8 @@ class DataService extends EventEmitter { log.info(mongoLogId(1_001_000_015), this._logCtx(), 'Connected', attr); debug('connected!', attr); - this._client = client; + this._metadataClient = metadataClient; + this._crudClient = crudClient; this._tunnel = tunnel; this._mongoClientConnectionOptions = connectionOptions; } finally { @@ -570,10 +579,13 @@ class DataService extends EventEmitter { 'Running estimatedCount', { ns } ); - this._collection(ns).estimatedDocumentCount(options, (err, result) => { - logop(err, result); - callback(err, result!); - }); + this._collection(ns, 'CRUD').estimatedDocumentCount( + options, + (err, result) => { + logop(err, result); + callback(err, result!); + } + ); } /** @@ -595,10 +607,14 @@ class DataService extends EventEmitter { 'Running countDocuments', { ns } ); - this._collection(ns).countDocuments(filter, options, (err, result) => { - logop(err, result); - callback(err, result!); - }); + this._collection(ns, 'CRUD').countDocuments( + filter, + options, + (err, result) => { + logop(err, result); + callback(err, result!); + } + ); } /** @@ -614,7 +630,7 @@ class DataService extends EventEmitter { callback: Callback> ): void { const collectionName = this._collectionName(ns); - const db = this._initializedClient.db(this._databaseName(ns)); + const db = this._initializedClient('CRUD').db(this._databaseName(ns)); const logop = this._startLogOp( mongoLogId(1_001_000_036), 'Running createCollection', @@ -650,7 +666,7 @@ class DataService extends EventEmitter { 'Running createIndex', { ns, spec, options } ); - this._collection(ns).createIndex(spec, options, (error, result) => { + this._collection(ns, 'CRUD').createIndex(spec, options, (error, result) => { logop(error); if (error) { // @ts-expect-error Callback without result... @@ -700,7 +716,7 @@ class DataService extends EventEmitter { 'Running deleteOne', { ns } ); - this._collection(ns).deleteOne(filter, options, (error, result) => { + this._collection(ns, 'CRUD').deleteOne(filter, options, (error, result) => { logop(error, result); if (error) { // @ts-expect-error Callback without result... @@ -729,14 +745,18 @@ class DataService extends EventEmitter { 'Running deleteMany', { ns } ); - this._collection(ns).deleteMany(filter, options, (error, result) => { - logop(error, result); - if (error) { - // @ts-expect-error Callback without result... - return callback(this._translateMessage(error)); + this._collection(ns, 'CRUD').deleteMany( + filter, + options, + (error, result) => { + logop(error, result); + if (error) { + // @ts-expect-error Callback without result... + return callback(this._translateMessage(error)); + } + callback(null, result!); } - callback(null, result!); - }); + ); } /** @@ -747,13 +767,18 @@ class DataService extends EventEmitter { log.info(mongoLogId(1_001_000_016), this._logCtx(), 'Disconnecting'); try { - await this._client - ?.close(true) - .catch((err) => debug('failed to close MongoClient', err)); - - await this._tunnel - ?.close() - .catch((err) => debug('failed to close tunnel', err)); + await Promise.all([ + this._metadataClient + ?.close(true) + .catch((err) => debug('failed to close MongoClient', err)), + this._crudClient !== this._metadataClient && + this._crudClient + ?.close(true) + .catch((err) => debug('failed to close MongoClient', err)), + this._tunnel + ?.close() + .catch((err) => debug('failed to close tunnel', err)), + ]); } finally { this._cleanup(); log.info(mongoLogId(1_001_000_017), this._logCtx(), 'Fully closed'); @@ -772,7 +797,7 @@ class DataService extends EventEmitter { 'Running dropCollection', { ns } ); - this._collection(ns).drop((error, result) => { + this._collection(ns, 'CRUD').drop((error, result) => { logop(error, result); if (error) { // @ts-expect-error Callback without result... @@ -794,7 +819,7 @@ class DataService extends EventEmitter { 'Running dropDatabase', { db: name } ); - this._initializedClient + this._initializedClient('CRUD') .db(this._databaseName(name)) .dropDatabase((error, result) => { logop(error, result); @@ -819,7 +844,7 @@ class DataService extends EventEmitter { 'Running dropIndex', { ns, name } ); - this._collection(ns).dropIndex(name, (error, result) => { + this._collection(ns, 'CRUD').dropIndex(name, (error, result) => { logop(error, result); if (error) { // @ts-expect-error Callback without result... @@ -869,7 +894,7 @@ class DataService extends EventEmitter { callback = options; options = undefined; } - const cursor = this._collection(ns).aggregate(pipeline, options); + const cursor = this._collection(ns, 'CRUD').aggregate(pipeline, options); // async when a callback is provided if (isFunction(callback)) { process.nextTick(callback, null, cursor); @@ -896,7 +921,7 @@ class DataService extends EventEmitter { const logop = this._startLogOp(mongoLogId(1_001_000_042), 'Running find', { ns, }); - const cursor = this._collection(ns).find(filter, options); + const cursor = this._collection(ns, 'CRUD').find(filter, options); cursor.toArray((error, documents) => { logop(error); if (error) { @@ -927,7 +952,7 @@ class DataService extends EventEmitter { logop(null); - return this._collection(ns).find(filter, options); + return this._collection(ns, 'CRUD').find(filter, options); } /** @@ -951,7 +976,7 @@ class DataService extends EventEmitter { 'Running findOneAndReplace', { ns } ); - this._collection(ns).findOneAndReplace( + this._collection(ns, 'CRUD').findOneAndReplace( filter, replacement, options, @@ -987,7 +1012,7 @@ class DataService extends EventEmitter { 'Running findOneAndUpdate', { ns } ); - this._collection(ns).findOneAndUpdate( + this._collection(ns, 'CRUD').findOneAndUpdate( filter, update, options, @@ -1023,7 +1048,7 @@ class DataService extends EventEmitter { ); // @todo thomasr: driver explain() does not yet support verbosity, // once it does, should be passed along from the options object. - this._collection(ns) + this._collection(ns, 'CRUD') .find(filter, options) .explain((error, explanation) => { logop(error); @@ -1049,7 +1074,7 @@ class DataService extends EventEmitter { { ns } ); getIndexes( - this._initializedClient, + this._initializedClient('CRUD'), ns, (error: Error | undefined, data: IndexDetails[]) => { logop(error); @@ -1067,7 +1092,7 @@ class DataService extends EventEmitter { */ async instance(): Promise { try { - const instanceData = await getInstance(this._initializedClient); + const instanceData = await getInstance(this._initializedClient('META')); log.info( mongoLogId(1_001_000_024), @@ -1106,7 +1131,7 @@ class DataService extends EventEmitter { 'Running insertOne', { ns } ); - this._collection(ns).insertOne(doc, options, (error, result) => { + this._collection(ns, 'CRUD').insertOne(doc, options, (error, result) => { logop(error, { acknowledged: result?.acknowledged }); if (error) { // @ts-expect-error Callback without result... @@ -1135,7 +1160,7 @@ class DataService extends EventEmitter { 'Running insertOne', { ns } ); - this._collection(ns).insertMany(docs, options, (error, result) => { + this._collection(ns, 'CRUD').insertMany(docs, options, (error, result) => { logop(error, { acknowledged: result?.acknowledged, insertedCount: result?.insertedCount, @@ -1161,7 +1186,7 @@ class DataService extends EventEmitter { docs: Document[], options: BulkWriteOptions ): Promise> { - return this._collection(ns).insertMany(docs, options); + return this._collection(ns, 'CRUD').insertMany(docs, options); } /** @@ -1185,7 +1210,7 @@ class DataService extends EventEmitter { { ns } ); const collectionName = this._collectionName(ns); - const db = this._initializedClient.db(this._databaseName(ns)); + const db = this._initializedClient('CRUD').db(this._databaseName(ns)); // Order of arguments is important here, collMod is a command name and it // should always be the first one in the object const command = { @@ -1223,14 +1248,19 @@ class DataService extends EventEmitter { 'Running updateOne', { ns } ); - this._collection(ns).updateOne(filter, update, options, (error, result) => { - logop(error, result); - if (error) { - // @ts-expect-error Callback without result... - return callback(this._translateMessage(error)); + this._collection(ns, 'CRUD').updateOne( + filter, + update, + options, + (error, result) => { + logop(error, result); + if (error) { + // @ts-expect-error Callback without result... + return callback(this._translateMessage(error)); + } + callback(null, result!); } - callback(null, result!); - }); + ); } /** @@ -1254,7 +1284,7 @@ class DataService extends EventEmitter { 'Running updateMany', { ns } ); - this._collection(ns).updateMany( + this._collection(ns, 'CRUD').updateMany( filter, update, options, @@ -1280,7 +1310,7 @@ class DataService extends EventEmitter { mongoLogId(1_001_000_053), 'Running currentOp' ); - this._initializedClient + this._initializedClient('META') .db('admin') .command({ currentOp: 1, $all: includeAll }, (error, result) => { logop(error); @@ -1289,7 +1319,7 @@ class DataService extends EventEmitter { mongoLogId(1_001_000_054), 'Searching $cmd.sys.inprog manually' ); - this._initializedClient + this._initializedClient('META') .db('admin') .collection('$cmd.sys.inprog') .findOne({ $all: includeAll }, (error2, result2) => { @@ -1323,15 +1353,18 @@ class DataService extends EventEmitter { 'Running serverStats' ); - this._defaultDb.admin().serverStatus((error, result) => { - logop(error); + this._initializedClient('META') + .db() + .admin() + .serverStatus((error, result) => { + logop(error); - if (error) { - // @ts-expect-error Callback without result... - return callback(this._translateMessage(error)); - } - callback(null, result!); - }); + if (error) { + // @ts-expect-error Callback without result... + return callback(this._translateMessage(error)); + } + callback(null, result!); + }); } /** @@ -1341,14 +1374,17 @@ class DataService extends EventEmitter { */ top(callback: Callback): void { const logop = this._startLogOp(mongoLogId(1_001_000_062), 'Running top'); - this._defaultDb.admin().command({ top: 1 }, (error, result) => { - logop(error); - if (error) { - // @ts-expect-error Callback without result... - return callback(this._translateMessage(error)); - } - callback(null, result!); - }); + this._initializedClient('META') + .db() + .admin() + .command({ top: 1 }, (error, result) => { + logop(error); + if (error) { + // @ts-expect-error Callback without result... + return callback(this._translateMessage(error)); + } + callback(null, result!); + }); } /** @@ -1381,7 +1417,7 @@ class DataService extends EventEmitter { } ); - this._initializedClient + this._initializedClient('CRUD') .db(this._databaseName(sourceNs)) .createCollection(name, options, (error, result) => { logop(error, result); @@ -1416,7 +1452,7 @@ class DataService extends EventEmitter { collMod: name, ...options, }; - const db = this._initializedClient.db(this._databaseName(sourceNs)); + const db = this._initializedClient('META').db(this._databaseName(sourceNs)); const logop = this._startLogOp( mongoLogId(1_001_000_056), @@ -1493,8 +1529,10 @@ class DataService extends EventEmitter { /** * Create a ClientSession that can be passed to commands. */ - startSession(): ClientSession { - return this._initializedClient.startSession(); + startSession(clientType: ClientType): ClientSession { + const session = this._initializedClient(clientType).startSession(); + (session as any)[kSessionClientType] = clientType; + return session; } /** @@ -1502,18 +1540,30 @@ class DataService extends EventEmitter { * @param clientSession - a ClientSession (can be created with startSession()) */ killSessions(sessions: ClientSession | ClientSession[]): Promise { - return this._initializedClient.db('admin').command({ - killSessions: Array.isArray(sessions) - ? sessions.map((s) => s.id) - : [sessions.id], - }); + const sessionsArray = Array.isArray(sessions) ? sessions : [sessions]; + const clientTypes = new Set( + sessionsArray.map((s: any) => s[kSessionClientType]) + ); + if (clientTypes.size !== 1) { + throw new Error( + `Cannot kill sessions without a specific client type: [${[ + ...clientTypes, + ].join(', ')}]` + ); + } + const [clientType] = clientTypes; + return this._initializedClient(clientType) + .db('admin') + .command({ + killSessions: sessionsArray.map((s) => s.id), + }); } isConnected(): boolean { // This is better than just returning internal `_isConnecting` as this // actually shows when the client is available on the NativeClient instance // and connected - return !!this._client; + return !!this._metadataClient; } /** @@ -1522,8 +1572,6 @@ class DataService extends EventEmitter { * @param {MongoClient} client - The driver client. */ private setupListeners(client: MongoClient): void { - this._client = client; - if (client) { client.on( 'serverDescriptionChanged', @@ -1694,15 +1742,15 @@ class DataService extends EventEmitter { } } - private get _initializedClient(): MongoClient { - if (!this._client) { - throw new Error('client not yet initialized'); + private _initializedClient(type: ClientType): MongoClient { + if (type !== 'CRUD' && type !== 'META') { + throw new Error(`Invalid client type: ${type as string}`); } - return this._client; - } - - private get _defaultDb(): Db { - return this._initializedClient.db(); + const client = type === 'CRUD' ? this._crudClient : this._metadataClient; + if (!client) { + throw new Error('Client not yet initialized'); + } + return client; } /** @@ -1720,7 +1768,7 @@ class DataService extends EventEmitter { { db: name } ); try { - const db = this._initializedClient.db(name); + const db = this._initializedClient('META').db(name); const stats = await runCommand(db, { dbStats: 1 }); const normalized = adaptDatabaseInfo(stats); return { name, ...normalized }; @@ -1806,8 +1854,8 @@ class DataService extends EventEmitter { * @param ns - The namespace. */ // TODO: this is used directly in compass-import-export/collection-stream - _collection(ns: string): Collection { - return this._initializedClient + _collection(ns: string, type: ClientType): Collection { + return this._initializedClient(type) .db(this._databaseName(ns)) .collection(this._collectionName(ns)); } @@ -1887,10 +1935,10 @@ class DataService extends EventEmitter { } private _cleanup(): void { - if (this._client) { - this._client.removeAllListeners(); - } - this._client = undefined; + this._metadataClient?.removeAllListeners?.(); + this._crudClient?.removeAllListeners?.(); + this._metadataClient = undefined; + this._crudClient = undefined; this._tunnel = undefined; this._mongoClientConnectionOptions = undefined; this._isWritable = false; From 8bfd95df6bb4aa8ccacfa14f58de89d1b08cd1e9 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 29 Mar 2022 12:01:51 +0200 Subject: [PATCH 2/2] fixup: review comment --- packages/data-service/src/data-service.ts | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/packages/data-service/src/data-service.ts b/packages/data-service/src/data-service.ts index 27accceaf6f..d12ded62ae5 100644 --- a/packages/data-service/src/data-service.ts +++ b/packages/data-service/src/data-service.ts @@ -91,6 +91,9 @@ let id = 0; type ClientType = 'CRUD' | 'META'; const kSessionClientType = Symbol('kSessionClientType'); +interface CompassClientSession extends ClientSession { + [kSessionClientType]: ClientType; +} class DataService extends EventEmitter { private readonly _connectionOptions: Readonly; @@ -1529,9 +1532,11 @@ class DataService extends EventEmitter { /** * Create a ClientSession that can be passed to commands. */ - startSession(clientType: ClientType): ClientSession { - const session = this._initializedClient(clientType).startSession(); - (session as any)[kSessionClientType] = clientType; + startSession(clientType: ClientType): CompassClientSession { + const session = this._initializedClient( + clientType + ).startSession() as CompassClientSession; + session[kSessionClientType] = clientType; return session; } @@ -1539,10 +1544,12 @@ class DataService extends EventEmitter { * Kill a session and terminate all in progress operations. * @param clientSession - a ClientSession (can be created with startSession()) */ - killSessions(sessions: ClientSession | ClientSession[]): Promise { + killSessions( + sessions: CompassClientSession | CompassClientSession[] + ): Promise { const sessionsArray = Array.isArray(sessions) ? sessions : [sessions]; const clientTypes = new Set( - sessionsArray.map((s: any) => s[kSessionClientType]) + sessionsArray.map((s) => s[kSessionClientType]) ); if (clientTypes.size !== 1) { throw new Error(