Skip to content

Commit

Permalink
fix: allow event loop to process during wait queue processing
Browse files Browse the repository at this point in the history
Running `processWaitQueue` on the next tick allows the event loop
to process while the connection pool is processing large numbers of
wait queue members. This also uncovered a few issues with timing
in our tests, and in some cases our top-level API:
  - `commitTransaction` / `abortTransaction` use `maybePromise` now
  - `endSession` must wait for all the machinery behind the scenes to
     check out a connection and write a message before considering its
     job finished
   - internal calls to `kill` a cursor now await the the process of fully
     sending that command, even if they ignore the response

NODE-2803
  • Loading branch information
mbroadst committed Sep 14, 2020
1 parent e225ee5 commit cd91139
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 83 deletions.
11 changes: 3 additions & 8 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ export class ConnectionPool extends EventEmitter {
return;
}

// add this request to the wait queue
const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
Expand All @@ -299,11 +298,8 @@ export class ConnectionPool extends EventEmitter {
}, waitQueueTimeoutMS);
}

// place the member at the end of the wait queue
this[kWaitQueue].push(waitQueueMember);

// process the wait queue
processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand All @@ -316,7 +312,6 @@ export class ConnectionPool extends EventEmitter {
const stale = connectionIsStale(this, connection);
const willDestroy = !!(poolClosed || stale || connection.closed);

// Properly adjust state of connection
if (!willDestroy) {
connection.markAvailable();
this[kConnections].push(connection);
Expand All @@ -329,7 +324,7 @@ export class ConnectionPool extends EventEmitter {
destroyConnection(this, connection, reason);
}

processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand Down Expand Up @@ -503,7 +498,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
processWaitQueue(pool);
setImmediate(() => processWaitQueue(pool));
});
}

Expand Down
25 changes: 15 additions & 10 deletions src/cursor/core_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,10 @@ function nextFunction(self: CoreCursor, callback: Callback) {

if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
return self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);
} else if (
self.cursorState.cursorIndex === self.cursorState.documents.length &&
!Long.ZERO.equals(cursorId)
Expand Down Expand Up @@ -775,9 +776,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
} else {
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);

return;
}

// Increment the current cursor limit
Expand All @@ -789,11 +793,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
// Doc overflow
if (!doc || doc.$err) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, () =>
callback(new MongoError(doc ? doc.$err : undefined))
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, () => callback(new MongoError(doc ? doc.$err : undefined)))
);

return;
}

// Transform the doc with passed in transformation method if provided
Expand Down
41 changes: 9 additions & 32 deletions src/cursor/cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -656,46 +656,23 @@ export class Cursor<
forEach(iterator: (doc: Document) => void): Promise<Document>;
forEach(iterator: (doc: Document) => void, callback: Callback): void;
forEach(iterator: (doc: Document) => void, callback?: Callback): Promise<Document> | void {
const Promise = PromiseProvider.get();
if (typeof iterator !== 'function') {
throw new TypeError('Missing required parameter `iterator`');
}

// Rewind cursor state
this.rewind();

// Set current cursor to INIT
this.s.state = CursorState.INIT;

if (typeof callback === 'function') {
return maybePromise(callback, done => {
each(this, (err, doc) => {
if (err) {
callback(err);
return false;
}

if (doc != null) {
iterator(doc);
return true;
}

if (doc == null) {
callback(undefined);
return false;
}
if (err) return done(err);
if (doc != null) return iterator(doc);
done();
});
} else {
return new Promise<Document>((fulfill, reject) => {
each(this, (err, doc) => {
if (err) {
reject(err);
return false;
} else if (doc == null) {
fulfill();
return false;
} else {
iterator(doc);
return true;
}
});
});
}
});
}

/**
Expand Down
66 changes: 44 additions & 22 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export interface ClientSessionOptions {
/** @public */
export type WithTransactionCallback = (session: ClientSession) => Promise<any> | void;

const kServerSession = Symbol('serverSession');

/**
* A class representing a client session on the server
*
Expand All @@ -62,7 +64,6 @@ class ClientSession extends EventEmitter {
topology: Topology;
sessionPool: ServerSessionPool;
hasEnded: boolean;
serverSession?: ServerSession;
clientOptions?: MongoClientOptions;
supports: { causalConsistency: boolean };
clusterTime?: ClusterTime;
Expand All @@ -71,6 +72,7 @@ class ClientSession extends EventEmitter {
owner: symbol | CoreCursor;
defaultTransactionOptions: TransactionOptions;
transaction: Transaction;
[kServerSession]?: ServerSession;

/**
* Create a client session.
Expand Down Expand Up @@ -102,8 +104,8 @@ class ClientSession extends EventEmitter {
this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.serverSession = sessionPool.acquire();
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.supports = {
causalConsistency:
Expand All @@ -124,41 +126,61 @@ class ClientSession extends EventEmitter {
return this.serverSession?.id;
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
}

/**
* Ends this session on the server
*
* @param options - Optional settings. Currently reserved for future use
* @param callback - Optional callback for completion of this operation
*/
endSession(): void;
endSession(): Promise<void>;
endSession(callback: Callback<void>): void;
endSession(options: Record<string, unknown>): Promise<void>;
endSession(options: Record<string, unknown>, callback: Callback<void>): void;
endSession(options?: Record<string, unknown> | Callback<void>, callback?: Callback<void>): void {
if (typeof options === 'function') (callback = options as Callback), (options = {});
endSession(
options?: Record<string, unknown> | Callback<void>,
callback?: Callback<void>
): void | Promise<void> {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (this.hasEnded) {
if (typeof callback === 'function') callback();
return;
}
return maybePromise(callback, done => {
if (this.hasEnded) {
return done();
}

if (this.serverSession && this.inTransaction()) {
this.abortTransaction(); // pass in callback?
}
const completeEndSession = () => {
// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;

// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);
// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);

// release the server session back to the pool
if (this.serverSession) {
this.sessionPool.release(this.serverSession);
}
// spec indicates that we should ignore all errors for `endSessions`
done();
};

this.serverSession = undefined;
if (this.serverSession && this.inTransaction()) {
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
});

return;
}

// spec indicates that we should ignore all errors for `endSessions`
if (typeof callback === 'function') callback();
completeEndSession();
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3997,11 +3997,11 @@ describe('Cursor', function () {
const collection = db.collection('cursor_session_tests2');

const cursor = collection.find();
const promise = cursor.forEach();
const promise = cursor.forEach(() => {});
expect(promise).to.exist.and.to.be.an.instanceof(Promise);
promise.catch(() => {});

cursor.close(() => client.close(() => done()));
promise.then(() => {
cursor.close(() => client.close(() => done()));
});
});
});

Expand Down
11 changes: 6 additions & 5 deletions test/functional/spec-runner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,12 @@ function runTestSuiteTest(configuration, spec, context) {
throw err;
})
.then(() => {
if (session0) session0.endSession();
if (session1) session1.endSession();

return validateExpectations(context.commandEvents, spec, savedSessionData);
});
const promises = [];
if (session0) promises.push(session0.endSession());
if (session1) promises.push(session1.endSession());
return Promise.all(promises);
})
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
});
}

Expand Down
3 changes: 1 addition & 2 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ describe('Connection Pool', function () {
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
pool.checkIn(conn);

expect(pool).property('waitQueueSize').to.equal(0);

setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0));
done();
});
});
Expand Down

0 comments on commit cd91139

Please sign in to comment.