Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5019): add runCursorCommand API #3655

Merged
merged 20 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .evergreen/run-serverless-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ npx mocha \
test/integration/transactions/transactions.test.ts \
test/integration/versioned-api/versioned_api.spec.test.js \
test/integration/load-balancers/load_balancers.spec.test.js \
test/integration/client-side-encryption/client_side_encryption.spec.test.ts
test/integration/client-side-encryption/client_side_encryption.spec.test.ts \
test/integration/run-command/run_command.spec.test.ts
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ export abstract class AbstractCursor<
abstract clone(): AbstractCursor<TSchema>;

/** @internal */
abstract _initialize(
protected abstract _initialize(
session: ClientSession | undefined,
callback: Callback<ExecutionResult>
): void;
Expand Down
140 changes: 140 additions & 0 deletions src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import type { BSONSerializeOptions, Document, Long } from '../bson';
import type { Db } from '../db';
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { RunCommandOperation } from '../operations/run_command';
import type { ReadConcernLike } from '../read_concern';
import type { ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Callback, ns } from '../utils';
import { AbstractCursor } from './abstract_cursor';

/** @public */
export type RunCursorCommandOptions = {
readPreference?: ReadPreferenceLike;
session?: ClientSession;
} & BSONSerializeOptions;

/** @internal */
type RunCursorCommandResponse = {
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
ok: 1;
};

/** @public */
export class RunCommandCursor extends AbstractCursor {
public readonly command: Readonly<Record<string, any>>;
public readonly getMoreOptions: {
comment?: any;
maxAwaitTimeMS?: number;
batchSize?: number;
} = {};

/**
* Controls the `getMore.comment` field
* @param comment - any BSON value
*/
public setComment(comment: any): this {
this.getMoreOptions.comment = comment;
return this;
}

/**
* Controls the `getMore.maxTimeMS` field. Only valid when cursor is tailable await
* @param maxTimeMS - the number of milliseconds to wait for new data
*/
public setMaxTimeMS(maxTimeMS: number): this {
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
return this;
}

/**
* Controls the `getMore.batchSize` field
* @param maxTimeMS - the number documents to return in the `nextBatch`
*/
public setBatchSize(batchSize: number): this {
this.getMoreOptions.batchSize = batchSize;
return this;
}

/** Unsupported for RunCommandCursor */
public override clone(): never {
throw new MongoAPIError('Clone not supported, create a new cursor with db.runCursorCommand');
}

/** Unsupported for RunCommandCursor: readConcern must be configured directly on command document */
public override withReadConcern(_: ReadConcernLike): never {
throw new MongoAPIError(
'RunCommandCursor does not support readConcern it must be attached to the command being run'
);
}

/** Unsupported for RunCommandCursor: various cursor flags must be configured directly on command document */
public override addCursorFlag(_: string, __: boolean): never {
throw new MongoAPIError(
'RunCommandCursor does not support cursor flags, they must be attached to the command being run'
);
}

/** Unsupported for RunCommandCursor: maxTimeMS must be configured directly on command document */
public override maxTimeMS(_: number): never {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
throw new MongoAPIError(
'maxTimeMS must be configured on the command document directly, to configure getMore.maxTimeMS use cursor.setMaxTimeMS()'
);
}

/** Unsupported for RunCommandCursor: batchSize must be configured directly on command document */
public override batchSize(_: number): never {
throw new MongoAPIError(
'batchSize must be configured on the command document directly, to configure getMore.batchSize use cursor.setBatchSize()'
);
}

/** @internal */
private db: Db;

/** @internal */
constructor(db: Db, command: Document, options: RunCursorCommandOptions = {}) {
super(db.s.client, ns(db.namespace), options);
this.db = db;
this.command = Object.freeze({ ...command });
}

/** @internal */
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
...this.cursorOptions,
session: session,
readPreference: this.cursorOptions.readPreference
});
executeOperation(this.client, operation).then(
response => {
if (response.cursor == null) {
callback(
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
);
return;
}
callback(undefined, {
server: operation.server,
session,
response
});
},
err => callback(err)
);
}

/** @internal */
override _getMore(_batchSize: number, callback: Callback<Document>) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
...this.cursorOptions,
session: this.session,
...this.getMoreOptions
});

executeOperation(this.client, getMoreOperation, callback);
}
}
14 changes: 14 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
import * as CONSTANTS from './constants';
import { AggregationCursor } from './cursor/aggregation_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor';
import { MongoAPIError, MongoInvalidArgumentError } from './error';
import type { MongoClient, PkFactory } from './mongo_client';
import type { TODO_NODE_3286 } from './mongo_types';
Expand Down Expand Up @@ -523,6 +524,19 @@ export class Db {

return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
}

/**
* A low level cursor API providing basic driver functionality:
* - ClientSession management
* - ReadPreference for server selection
* - Running getMores automatically when a local batch is exhausted
*
* @param command - The command that will start a cursor on the server.
* @param options - Configurations for running the command, bson options will apply to getMores
*/
runCursorCommand(command: Document, options?: RunCursorCommandOptions): RunCommandCursor {
return new RunCommandCursor(this, command, options);
}
}

// TODO(NODE-3484): Refactor into MongoDBNamespace
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
import { FindCursor } from './cursor/find_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
import type { RunCommandCursor } from './cursor/run_command_cursor';
import { Db } from './db';
import { GridFSBucket } from './gridfs';
import { GridFSBucketReadStream } from './gridfs/download';
Expand Down Expand Up @@ -87,6 +88,7 @@ export {
ListIndexesCursor,
MongoClient,
OrderedBulkOperation,
RunCommandCursor,
UnorderedBulkOperation
};

Expand Down Expand Up @@ -275,6 +277,7 @@ export type {
ChangeStreamAggregateRawResult,
ChangeStreamCursorOptions
} from './cursor/change_stream_cursor';
export type { RunCursorCommandOptions } from './cursor/run_command_cursor';
export type { DbOptions, DbPrivate } from './db';
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
export type { Encrypter, EncrypterOptions } from './encrypter';
Expand Down
Empty file.
7 changes: 6 additions & 1 deletion test/integration/run-command/run_command.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,10 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('RunCommand spec', () => {
runUnifiedSuite(loadSpecTests('run-command'));
runUnifiedSuite(loadSpecTests('run-command'), test => {
if (test.description === 'does not attach $readPreference to given command on standalone') {
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
}
return false;
});
});
50 changes: 50 additions & 0 deletions test/integration/run-command/run_cursor_command.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { expect } from 'chai';

import { Db, MongoClient } from '../../mongodb';

describe('runCursorCommand API', () => {
let client: MongoClient;
let db: Db;

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
db = client.db();
await db.dropDatabase().catch(() => null);
await db
.collection<{ _id: number }>('collection')
.insertMany([{ _id: 0 }, { _id: 1 }, { _id: 2 }]);
});

afterEach(async function () {
await client.close();
});

it('returns each document only once across multiple iterators', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test be on AbstractCursor instead of runCommandCursor, since the behavior comes from the abstract class? This test is not specific to our RunCommandCursor.

Why was it added now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functionality is in the AbstractCursor but as an integration test I'm using the public API to reproduce the expected behavior. I was curious what the behavior of multiple iterators (or nested for-await loops) from one cursor is. Returning a document only once regardless of how many iterators there are is supported by nature of our dequeuing from a List. Since it is a data structure we have changed in the past it is possible that we could modify this behavior if, say, we used an offset into a List that is specific to each iterator.

const cursor = db.runCursorCommand({ find: 'collection', filter: {}, batchSize: 1 });
cursor.setBatchSize(1);

const a = cursor[Symbol.asyncIterator]();
const b = cursor[Symbol.asyncIterator]();

// Interleaving calls to A and B
const results = [
await a.next(), // find, first doc
await b.next(), // getMore, second doc

await a.next(), // getMore, third doc
await b.next(), // getMore, no doc & exhausted id, a.k.a. done

await a.next(), // done
await b.next() // done
];

expect(results).to.deep.equal([
{ value: { _id: 0 }, done: false },
{ value: { _id: 1 }, done: false },
{ value: { _id: 2 }, done: false },
{ value: undefined, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true }
]);
});
});
1 change: 1 addition & 0 deletions test/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
export * from '../src/cursor/find_cursor';
export * from '../src/cursor/list_collections_cursor';
export * from '../src/cursor/list_indexes_cursor';
export * from '../src/cursor/run_command_cursor';
export * from '../src/db';
export * from '../src/deps';
export * from '../src/encrypter';
Expand Down