Skip to content

Commit

Permalink
refactor(NODE-5379): cursor internals to use async-await (#3804)
Browse files Browse the repository at this point in the history
Co-authored-by: Bailey Pearson <bailey.pearson@mongodb.com>
  • Loading branch information
malikj2000 and baileympearson committed Sep 12, 2023
1 parent 2a3de19 commit b1c0eac
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 234 deletions.
141 changes: 67 additions & 74 deletions src/cursor/abstract_cursor.ts
@@ -1,5 +1,4 @@
import { Readable, Transform } from 'stream';
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import {
Expand All @@ -21,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { type Callback, List, type MongoDBNamespace, ns } from '../utils';
import { List, type MongoDBNamespace, ns } from '../utils';

/** @internal */
const kId = Symbol('id');
Expand Down Expand Up @@ -310,7 +309,7 @@ export abstract class AbstractCursor<
const message =
'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.';

await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
await cleanupCursor(this, { needsToEmitClosed: true }).catch(() => null);

throw new MongoAPIError(message);
}
Expand Down Expand Up @@ -419,7 +418,7 @@ export abstract class AbstractCursor<
async close(): Promise<void> {
const needsToEmitClosed = !this[kClosed];
this[kClosed] = true;
await cleanupCursorAsync(this, { needsToEmitClosed });
await cleanupCursor(this, { needsToEmitClosed });
}

/**
Expand Down Expand Up @@ -613,21 +612,18 @@ export abstract class AbstractCursor<
abstract clone(): AbstractCursor<TSchema>;

/** @internal */
protected abstract _initialize(
session: ClientSession | undefined,
callback: Callback<ExecutionResult>
): void;
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;

/** @internal */
_getMore(batchSize: number, callback: Callback<Document>): void {
async getMore(batchSize: number): Promise<Document | null> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
...this[kOptions],
session: this[kSession],
batchSize
});

executeOperation(this[kClient], getMoreOperation, callback);
return executeOperation(this[kClient], getMoreOperation);
}

/**
Expand All @@ -637,51 +633,50 @@ export abstract class AbstractCursor<
* operation. We cannot refactor to use the abstract _initialize method without
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
this._initialize(this[kSession], (error, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;

if (response.cursor) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

if (response.cursor.ns) {
this[kNamespace] = ns(response.cursor.ns);
}
async [kInit](): Promise<void> {
try {
const state = await this._initialize(this[kSession]);
const response = state.response;
this[kServer] = state.server;
if (response.cursor) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

this[kDocuments].pushMany(response.cursor.firstBatch);
if (response.cursor.ns) {
this[kNamespace] = ns(response.cursor.ns);
}

// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (this[kId] == null) {
this[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
this[kDocuments].push(state.response as TODO_NODE_3286);
}
this[kDocuments].pushMany(response.cursor.firstBatch);
}

// the cursor is now initialized, even if an error occurred or it is dead
this[kInitialized] = true;

if (error) {
return cleanupCursor(this, { error }, () => callback(error, undefined));
// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (this[kId] == null) {
this[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
this[kDocuments].push(state.response as TODO_NODE_3286);
}

if (this.isDead) {
return cleanupCursor(this, undefined, () => callback());
}
// the cursor is now initialized, even if it is dead
this[kInitialized] = true;
} catch (error) {
// the cursor is now initialized, even if an error occurred
this[kInitialized] = true;
await cleanupCursor(this, { error });
throw error;
}

callback();
});
if (this.isDead) {
await cleanupCursor(this, undefined);
}

return;
}
}

Expand Down Expand Up @@ -713,7 +708,7 @@ async function next<T>(
do {
if (cursor[kId] == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
await promisify(cursor[kInit].bind(cursor))();
await cursor[kInit]();
}

if (cursor[kDocuments].length !== 0) {
Expand All @@ -725,7 +720,7 @@ async function next<T>(
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
await cleanupCursor(cursor, { error, needsToEmitClosed: true }).catch(() => null);
throw error;
}
}
Expand All @@ -737,15 +732,15 @@ async function next<T>(
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
await cleanupCursor(cursor, {});
return null;
}

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;

try {
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
const response = await cursor.getMore(batchSize);

if (response) {
const cursorId =
Expand All @@ -761,7 +756,7 @@ async function next<T>(
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error }).catch(() => null);
await cleanupCursor(cursor, { error }).catch(() => null);
throw error;
}

Expand All @@ -773,7 +768,7 @@ async function next<T>(
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
await cleanupCursor(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
Expand All @@ -784,13 +779,10 @@ async function next<T>(
return null;
}

const cleanupCursorAsync = promisify(cleanupCursor);

function cleanupCursor(
async function cleanupCursor(
cursor: AbstractCursor,
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined,
callback: Callback
): void {
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined
): Promise<void> {
const cursorId = cursor[kId];
const cursorNs = cursor[kNamespace];
const server = cursor[kServer];
Expand All @@ -817,9 +809,7 @@ function cleanupCursor(

if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
callback();
});
await session.endSession({ error });
return;
}

Expand All @@ -828,16 +818,17 @@ function cleanupCursor(
}
}

return callback();
return;
}

function completeCleanup() {
async function completeCleanup() {
if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
try {
await session.endSession({ error });
} finally {
cursor.emit(AbstractCursor.CLOSE);
callback();
});
}
return;
}

Expand All @@ -847,7 +838,7 @@ function cleanupCursor(
}

cursor.emit(AbstractCursor.CLOSE);
return callback();
return;
}

cursor[kKilled] = true;
Expand All @@ -856,12 +847,14 @@ function cleanupCursor(
return completeCleanup();
}

executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
)
.catch(() => null)
.finally(completeCleanup);
try {
await executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
).catch(() => null);
} finally {
await completeCleanup();
}
}

/** @internal */
Expand Down
12 changes: 5 additions & 7 deletions src/cursor/aggregation_cursor.ts
Expand Up @@ -5,7 +5,7 @@ import { AggregateOperation, type AggregateOptions } from '../operations/aggrega
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import type { Sort } from '../sort';
import type { Callback, MongoDBNamespace } from '../utils';
import type { MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
Expand Down Expand Up @@ -61,19 +61,17 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
}

/** @internal */
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
async _initialize(session: ClientSession): Promise<ExecutionResult> {
const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions,
session
});

executeOperation(this.client, aggregateOperation, (err, response) => {
if (err || response == null) return callback(err);
const response = await executeOperation(this.client, aggregateOperation);

// TODO: NODE-2882
callback(undefined, { server: aggregateOperation.server, session, response });
});
// TODO: NODE-2882
return { server: aggregateOperation.server, session, response };
}

/** Execute the explain for the cursor */
Expand Down

0 comments on commit b1c0eac

Please sign in to comment.