Skip to content

Commit

Permalink
fix(NODE-5106): refactor connect locking and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
clemclx committed Mar 13, 2023
1 parent b139f17 commit 9bc400d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 44 deletions.
97 changes: 53 additions & 44 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,63 +407,72 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
* @see docs.mongodb.org/manual/reference/connection-string/
*/
async connect(): Promise<this> {
if (this.topology && this.topology.isConnected()) {
return this;
}

if (this.connectionLock) {
return this.connectionLock;
}

this.connectionLock = (async () => {
const options = this[kOptions];
try {
this.connectionLock = this._connect();
await this.connectionLock;
} finally {
// release
this.connectionLock = undefined;
}

if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);
return this;
}

for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
}
/**
* Create a topology to open the connection, must be locked to avoid topology leaks in concurrency scenario.
* Locking is enforced by the connect method.
*
* When decorators available put implementation back to original connect methods
* and enforce locking via a dedicated decorator.
* @see https://github.com/mongodb/node-mongodb-native/pull/3596#discussion_r1134211500
*/
private async _connect(): Promise<this> {
if (this.topology && this.topology.isConnected()) {
return this;
}

const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;
const options = this[kOptions];

topology.once(Topology.OPEN, () => this.emit('open', this));
if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);

for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
}
const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;

return this;
})();
topology.once(Topology.OPEN, () => this.emit('open', this));

try {
await this.connectionLock;
} finally {
// release
this.connectionLock = undefined;
for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
}

return this;
Expand Down
50 changes: 50 additions & 0 deletions test/integration/mongo_client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { expect } from 'chai';
import * as sinon from 'sinon';

import { MongoClient } from '../../src';

describe('MongoClient', () => {
let client: MongoClient;
let topologyOpenEvents;

beforeEach(async function () {
client = this.configuration.newClient();
topologyOpenEvents = [];
client.on('open', event => topologyOpenEvents.push(event));
});

afterEach(async function () {
await client.close();
});

it('Concurrents client connect correctly locked (only one topology created)', async function () {
await Promise.all([client.connect(), client.connect(), client.connect()]);

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});

it('Failed client connect must properly release lock', async function () {
const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient);
internalConnectStub.onFirstCall().rejects();

// first call rejected to simulate a connection failure
try {
await client.connect();
} catch (err) {
expect(err).to.exist;
}

internalConnectStub.restore();

// second call should connect
try {
await client.connect();
} catch (err) {
expect.fail(`client connect throwed unexpected error`);
}

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});
});

0 comments on commit 9bc400d

Please sign in to comment.