Skip to content

Commit

Permalink
refactor(NODE-5352): refactor AbstractOperation to use async (#3729)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikj2000 committed Jun 29, 2023
1 parent 1d31888 commit a329748
Show file tree
Hide file tree
Showing 44 changed files with 237 additions and 158 deletions.
10 changes: 7 additions & 3 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, type Hint } from '../operations/operation';
import { AbstractCallbackOperation, type Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -881,14 +881,18 @@ export interface BulkWriteOptions extends CommandOperationOptions {
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
class BulkWriteShimOperation extends AbstractOperation {
class BulkWriteShimOperation extends AbstractCallbackOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}

execute(server: Server, session: ClientSession | undefined, callback: Callback<any>): void {
executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any>
): void {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
Expand Down
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,12 @@ export type {
export type { InsertManyResult, InsertOneOptions, InsertOneResult } from './operations/insert';
export type { CollectionInfo, ListCollectionsOptions } from './operations/list_collections';
export type { ListDatabasesOptions, ListDatabasesResult } from './operations/list_databases';
export type { AbstractOperation, Hint, OperationOptions } from './operations/operation';
export type {
AbstractCallbackOperation,
AbstractOperation,
Hint,
OperationOptions
} from './operations/operation';
export type { ProfilingLevelOptions } from './operations/profiling_level';
export type { RemoveUserOptions } from './operations/remove_user';
export type { RenameOptions } from './operations/rename';
Expand Down
2 changes: 1 addition & 1 deletion src/operations/add_user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class AddUserOperation extends CommandOperation<Document> {
this.options = options ?? {};
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
this.pipeline.push(stage);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<T>
Expand Down
6 changes: 3 additions & 3 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractOperation, Aspect, defineAspects } from './operation';
import { AbstractCallbackOperation, Aspect, defineAspects } from './operation';

/** @internal */
export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResult> {
override options: BulkWriteOptions;
collection: Collection;
operations: AnyBulkWriteOperation[];
Expand All @@ -27,7 +27,7 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
this.operations = operations;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<BulkWriteResult>
Expand Down
6 changes: 3 additions & 3 deletions src/operations/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import type { Db } from '../db';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractOperation, type OperationOptions } from './operation';
import { AbstractCallbackOperation, type OperationOptions } from './operation';

export interface CollectionsOptions extends OperationOptions {
nameOnly?: boolean;
}

/** @internal */
export class CollectionsOperation extends AbstractOperation<Collection[]> {
export class CollectionsOperation extends AbstractCallbackOperation<Collection[]> {
override options: CollectionsOptions;
db: Db;

Expand All @@ -20,7 +20,7 @@ export class CollectionsOperation extends AbstractOperation<Collection[]> {
this.db = db;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Collection[]>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { ReadConcernLike } from './../read_concern';
import { AbstractOperation, Aspect, type OperationOptions } from './operation';
import { AbstractCallbackOperation, Aspect, type OperationOptions } from './operation';

/** @public */
export interface CollationOptions {
Expand Down Expand Up @@ -68,7 +68,7 @@ export interface OperationParent {
}

/** @internal */
export abstract class CommandOperation<T> extends AbstractOperation<T> {
export abstract class CommandOperation<T> extends AbstractCallbackOperation<T> {
override options: CommandOperationOptions;
readConcern?: ReadConcern;
writeConcern?: WriteConcern;
Expand Down
2 changes: 1 addition & 1 deletion src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class CountOperation extends CommandOperation<number> {
this.query = filter;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/count_documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
super(collection.s.namespace, pipeline, options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
super.execute(server, session, (err, result) => {
super.executeCallback(server, session, (err, result) => {
if (err || !result) {
callback(err);
return;
Expand Down
6 changes: 2 additions & 4 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
this.name = name;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Collection>
Expand Down Expand Up @@ -170,9 +170,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
if (encryptedFields) {
// Create the required index for queryable encryption support.
const createIndexOp = new CreateIndexOperation(db, name, { __safeContent__: 1 }, {});
await new Promise<void>((resolve, reject) => {
createIndexOp.execute(server, session, err => (err ? reject(err) : resolve()));
});
await createIndexOp.execute(server, session);
}

return coll;
Expand Down
14 changes: 9 additions & 5 deletions src/operations/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
return this.statements.every(op => (op.limit != null ? op.limit > 0 : true));
}

override execute(server: Server, session: ClientSession | undefined, callback: Callback): void {
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback
): void {
const options = this.options ?? {};
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
const command: Document = {
Expand Down Expand Up @@ -97,12 +101,12 @@ export class DeleteOneOperation extends DeleteOperation {
super(collection.s.namespace, [makeDeleteStatement(filter, { ...options, limit: 1 })], options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<DeleteResult>
): void {
super.execute(server, session, (err, res) => {
super.executeCallback(server, session, (err, res) => {
if (err || res == null) return callback(err);
if (res.code) return callback(new MongoServerError(res));
if (res.writeErrors) return callback(new MongoServerError(res.writeErrors[0]));
Expand All @@ -121,12 +125,12 @@ export class DeleteManyOperation extends DeleteOperation {
super(collection.s.namespace, [makeDeleteStatement(filter, options)], options);
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<DeleteResult>
): void {
super.execute(server, session, (err, res) => {
super.executeCallback(server, session, (err, res) => {
if (err || res == null) return callback(err);
if (res.code) return callback(new MongoServerError(res));
if (res.writeErrors) return callback(new MongoServerError(res.writeErrors[0]));
Expand Down
2 changes: 1 addition & 1 deletion src/operations/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class DistinctOperation extends CommandOperation<any[]> {
this.query = query;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any[]>
Expand Down
4 changes: 2 additions & 2 deletions src/operations/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class DropCollectionOperation extends CommandOperation<boolean> {
this.name = name;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<boolean>
Expand Down Expand Up @@ -102,7 +102,7 @@ export class DropDatabaseOperation extends CommandOperation<boolean> {
super(db, options);
this.options = options;
}
override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<boolean>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/estimated_document_count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation<number> {
this.collectionName = collection.collectionName;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/eval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class EvalOperation extends CommandOperation<Document> {
});
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
26 changes: 13 additions & 13 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';
import { AbstractCallbackOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';

type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<infer K>
type ResultTypeFromOperation<TOperation> = TOperation extends AbstractCallbackOperation<infer K>
? K
: never;

Expand Down Expand Up @@ -61,29 +61,29 @@ export interface ExecutionResult {
* @param callback - The command result callback
*/
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult>;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback: Callback<TResult>): void;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
export function executeOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
return maybeCallback(() => executeOperationAsync(client, operation), callback);
}

async function executeOperationAsync<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult> {
if (!(operation instanceof AbstractOperation)) {
if (!(operation instanceof AbstractCallbackOperation)) {
// TODO(NODE-3483): Extend MongoRuntimeError
throw new MongoRuntimeError('This method requires a valid operation instance');
}
Expand Down Expand Up @@ -152,13 +152,13 @@ async function executeOperationAsync<

if (session == null) {
// No session also means it is not retryable, early exit
return operation.executeAsync(server, undefined);
return operation.execute(server, undefined);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} finally {
if (session?.owner != null && session.owner === owner) {
await session.endSession().catch(() => null);
Expand All @@ -184,7 +184,7 @@ async function executeOperationAsync<
}

try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
Expand All @@ -209,7 +209,7 @@ type RetryOptions = {
};

async function retryOperation<
T extends AbstractOperation<TResult>,
T extends AbstractCallbackOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
Expand Down Expand Up @@ -257,7 +257,7 @@ async function retryOperation<
}

try {
return await operation.executeAsync(server, session);
return await operation.execute(server, session);
} catch (retryError) {
if (
retryError instanceof MongoError &&
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export class FindOperation extends CommandOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectId' ? { _id: filter } : filter;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find_and_modify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class FindAndModifyOperation extends CommandOperation<Document> {
this.query = query;
}

override execute(
override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<Document>
Expand Down
Loading

0 comments on commit a329748

Please sign in to comment.