diff --git a/packages/i18n/src/locales/en_US.ts b/packages/i18n/src/locales/en_US.ts index 3467c938df..c2ba54a854 100644 --- a/packages/i18n/src/locales/en_US.ts +++ b/packages/i18n/src/locales/en_US.ts @@ -388,6 +388,13 @@ const translations: Catalog = { example: 'db.collection.aggregate(pipeline, options).skip(offsetNumber)', }, + disableBlockWarnings: { + link: '', + description: + 'Disables warnings for blocking operations on the cursor.', + example: + 'db.collection.aggregate(pipeline, options).disableBlockWarnings()', + }, }, }, }, @@ -965,6 +972,17 @@ const translations: Catalog = { description: 'This method is deprecated because because after closing a cursor, the remaining documents in the batch are no longer accessible. If you want to see if the cursor is closed use cursor.isClosed. If you want to see if there are documents left in the batch, use cursor.tryNext. This is a breaking change', }, + disableBlockWarnings: { + link: '', + description: + 'Disables warnings for blocking operations on the cursor.', + }, + batchSize: { + description: 'Not available on change streams', + }, + maxTimeMS: { + description: 'Not available on change streams', + }, }, }, }, @@ -1260,6 +1278,13 @@ const translations: Catalog = { example: 'db.collection.find(query, projection, options).toArray()', }, + disableBlockWarnings: { + link: '', + description: + 'Disables warnings for blocking operations on the cursor.', + example: + 'db.collection.find(query, projection, options).disableBlockWarnings()', + }, }, }, }, @@ -1839,6 +1864,11 @@ const translations: Catalog = { description: "If a document is in the cursor's batch it will be returned, otherwise null will be returned", }, + disableBlockWarnings: { + link: '', + description: + 'Disables warnings for blocking operations on the cursor.', + }, }, }, }, diff --git a/packages/service-provider-core/src/cursors.ts b/packages/service-provider-core/src/cursors.ts index a09b6ddd69..126e71eefd 100644 --- a/packages/service-provider-core/src/cursors.ts +++ b/packages/service-provider-core/src/cursors.ts @@ -9,7 +9,7 @@ import type { ResumeToken, } from './all-transport-types'; -interface ServiceProviderBaseCursor { +export interface ServiceProviderBaseCursor { close(): Promise; hasNext(): Promise; next(): Promise; diff --git a/packages/service-provider-core/src/index.ts b/packages/service-provider-core/src/index.ts index 63151771dc..29af740b2c 100644 --- a/packages/service-provider-core/src/index.ts +++ b/packages/service-provider-core/src/index.ts @@ -23,6 +23,7 @@ export { bson } from './bson-export'; export { ServiceProviderAbstractCursor, ServiceProviderAggregationCursor, + ServiceProviderBaseCursor, ServiceProviderFindCursor, ServiceProviderRunCommandCursor, ServiceProviderChangeStream, diff --git a/packages/shell-api/src/abstract-cursor.ts b/packages/shell-api/src/abstract-cursor.ts index f51f1dad91..83b58d71a4 100644 --- a/packages/shell-api/src/abstract-cursor.ts +++ b/packages/shell-api/src/abstract-cursor.ts @@ -11,52 +11,25 @@ import type { ServiceProviderFindCursor, ServiceProviderAggregationCursor, ServiceProviderRunCommandCursor, + ServiceProviderBaseCursor, } from '@mongosh/service-provider-core'; import { asPrintable } from './enums'; import { CursorIterationResult } from './result'; import { iterate } from './helpers'; @shellApiClassNoHelp -export abstract class AbstractCursor< - CursorType extends - | ServiceProviderAggregationCursor - | ServiceProviderFindCursor - | ServiceProviderRunCommandCursor +export abstract class BaseCursor< + CursorType extends ServiceProviderBaseCursor > extends ShellApiWithMongoClass { _mongo: Mongo; _cursor: CursorType; - _transform: ((doc: any) => any) | null; - - _currentIterationResult: CursorIterationResult | null = null; + _transform: ((doc: any) => any) | null = null; + _blockingWarningDisabled = false; constructor(mongo: Mongo, cursor: CursorType) { super(); this._mongo = mongo; this._cursor = cursor; - this._transform = null; - } - - /** - * Internal method to determine what is printed for this class. - */ - async [asPrintable](): Promise { - return ( - await toShellResult(this._currentIterationResult ?? (await this._it())) - ).printable; - } - - async _it(): Promise { - const results = (this._currentIterationResult = - new CursorIterationResult()); - await iterate(results, this, await this._mongo._displayBatchSize()); - results.cursorHasMore = !this.isExhausted(); - return results; - } - - @returnType('this') - batchSize(size: number): this { - this._cursor.batchSize(size); - return this; } @returnsPromise @@ -136,24 +109,6 @@ export abstract class AbstractCursor< return count; } - @returnsPromise - async toArray(): Promise { - // toArray is always defined for driver cursors, but not necessarily - // in tests - if ( - typeof this._cursor.toArray === 'function' && - this._canDelegateIterationToUnderlyingCursor() - ) { - return await this._cursor.toArray(); - } - - const result = []; - for await (const doc of this) { - result.push(doc); - } - return result; - } - @returnType('this') pretty(): this { return this; @@ -170,12 +125,6 @@ export abstract class AbstractCursor< return this; } - @returnType('this') - maxTimeMS(value: number): this { - this._cursor.maxTimeMS(value); - return this; - } - @returnsPromise async next(): Promise { let result = await this._cursor.next(); @@ -185,6 +134,78 @@ export abstract class AbstractCursor< return result; } + disableBlockWarnings(): this { + this._blockingWarningDisabled = true; + return this; + } + + abstract batchSize(size: number): this; + abstract toArray(): Promise; + abstract maxTimeMS(value: number): this; + abstract objsLeftInBatch(): number; + abstract _it(): Promise; +} + +@shellApiClassNoHelp +export abstract class AbstractFiniteCursor< + CursorType extends + | ServiceProviderAggregationCursor + | ServiceProviderFindCursor + | ServiceProviderRunCommandCursor +> extends BaseCursor { + _currentIterationResult: CursorIterationResult | null = null; + + constructor(mongo: Mongo, cursor: CursorType) { + super(mongo, cursor); + } + + /** + * Internal method to determine what is printed for this class. + */ + async [asPrintable](): Promise { + return ( + await toShellResult(this._currentIterationResult ?? (await this._it())) + ).printable; + } + + override async _it(): Promise { + const results = (this._currentIterationResult = + new CursorIterationResult()); + await iterate(results, this, await this._mongo._displayBatchSize()); + results.cursorHasMore = !this.isExhausted(); + return results; + } + + @returnType('this') + override batchSize(size: number): this { + this._cursor.batchSize(size); + return this; + } + + @returnsPromise + override async toArray(): Promise { + // toArray is always defined for driver cursors, but not necessarily + // in tests + if ( + typeof this._cursor.toArray === 'function' && + this._canDelegateIterationToUnderlyingCursor() + ) { + return await this._cursor.toArray(); + } + + const result = []; + for await (const doc of this) { + result.push(doc); + } + return result; + } + + @returnType('this') + maxTimeMS(value: number): this { + this._cursor.maxTimeMS(value); + return this; + } + objsLeftInBatch(): number { return this._cursor.bufferedCount(); } diff --git a/packages/shell-api/src/aggregate-or-find-cursor.ts b/packages/shell-api/src/aggregate-or-find-cursor.ts index 46cb48e014..0c932ccaea 100644 --- a/packages/shell-api/src/aggregate-or-find-cursor.ts +++ b/packages/shell-api/src/aggregate-or-find-cursor.ts @@ -11,14 +11,14 @@ import type { ServiceProviderAggregationCursor, } from '@mongosh/service-provider-core'; import { validateExplainableVerbosity, markAsExplainOutput } from './helpers'; -import { AbstractCursor } from './abstract-cursor'; +import { AbstractFiniteCursor } from './abstract-cursor'; @shellApiClassNoHelp export abstract class AggregateOrFindCursor< CursorType extends | ServiceProviderAggregationCursor | ServiceProviderFindCursor -> extends AbstractCursor { +> extends AbstractFiniteCursor { @returnType('this') projection(spec: Document): this { this._cursor.project(spec); diff --git a/packages/shell-api/src/change-stream-cursor.spec.ts b/packages/shell-api/src/change-stream-cursor.spec.ts index 5d9bb6a60a..329ff9ec78 100644 --- a/packages/shell-api/src/change-stream-cursor.spec.ts +++ b/packages/shell-api/src/change-stream-cursor.spec.ts @@ -231,6 +231,28 @@ describe('ChangeStreamCursor', function () { ); expect(result).to.equal(1); }); + it('map() works', async function () { + cursor.map((doc) => ({ wrapped: doc })); + await coll.insertOne({ myDoc: 1 }); + const result = await ensureResult( + 100, + async () => await cursor.tryNext(), + (doc) => !!doc?.wrapped, + 'tryNext to return a document' + ); + expect(result.wrapped.fullDocument.myDoc).to.equal(1); + }); + it('forEach() works', async function () { + await coll.insertOne({ myDoc: 1 }); + let foundDoc = false; + await cursor.forEach((doc): boolean | void => { + if (doc?.fullDocument?.myDoc === 1) { + foundDoc = true; + return false; + } + }); + expect(foundDoc).to.equal(true); + }); }); describe('database watch', function () { beforeEach(async function () { @@ -330,12 +352,7 @@ describe('ChangeStreamCursor', function () { ); }); - for (const name of [ - 'map', - 'forEach', - 'toArray', - 'objsLeftInBatch', - ] as const) { + for (const name of ['toArray', 'objsLeftInBatch', 'maxTimeMS'] as const) { it(`${name} fails`, function () { expect(() => cursor[name]()).to.throw(MongoshUnimplementedError); }); diff --git a/packages/shell-api/src/change-stream-cursor.ts b/packages/shell-api/src/change-stream-cursor.ts index cf3c41b260..2d3033fec9 100644 --- a/packages/shell-api/src/change-stream-cursor.ts +++ b/packages/shell-api/src/change-stream-cursor.ts @@ -1,10 +1,4 @@ -import { - shellApiClassDefault, - returnsPromise, - returnType, - deprecated, - ShellApiWithMongoClass, -} from './decorators'; +import { shellApiClassDefault, returnsPromise, deprecated } from './decorators'; import type { ServiceProviderChangeStream, Document, @@ -19,26 +13,19 @@ import { } from '@mongosh/errors'; import { iterate } from './helpers'; import type Mongo from './mongo'; +import { BaseCursor } from './abstract-cursor'; @shellApiClassDefault -export default class ChangeStreamCursor extends ShellApiWithMongoClass { - _mongo: Mongo; - _cursor: ServiceProviderChangeStream; +export default class ChangeStreamCursor extends BaseCursor { _currentIterationResult: CursorIterationResult | null = null; _on: string; - constructor( - cursor: ServiceProviderChangeStream, - on: string, - mongo: Mongo - ) { - super(); - this._cursor = cursor; + constructor(cursor: ServiceProviderChangeStream, on: string, mongo: Mongo) { + super(mongo, cursor); this._on = on; - this._mongo = mongo; } - async _it(): Promise { + override async _it(): Promise { if (this._cursor.closed) { throw new MongoshRuntimeError('ChangeStreamCursor is closed'); } @@ -49,99 +36,72 @@ export default class ChangeStreamCursor extends ShellApiWithMongoClass { /** * Internal method to determine what is printed for this class. */ - [asPrintable](): string { - return `ChangeStreamCursor on ${this._on}`; - } - - @returnsPromise - async close(): Promise { - await this._cursor.close(); + override [asPrintable](): Promise { + return Promise.resolve(`ChangeStreamCursor on ${this._on}`); } @returnsPromise @deprecated - async hasNext(): Promise { - await this._instanceState.printWarning( - 'If there are no documents in the batch, hasNext will block. Use tryNext if you want to check if there ' + - 'are any documents without waiting.' - ); - return this._cursor.hasNext(); + override async hasNext(): Promise { + if (!this._blockingWarningDisabled) { + await this._instanceState.printWarning( + 'If there are no documents in the batch, hasNext will block. Use tryNext if you want to check if there ' + + 'are any documents without waiting, or cursor.disableBlockWarnings() if you want to disable this warning.' + ); + } + return super.hasNext(); } @returnsPromise - async tryNext(): Promise { + override async tryNext(): Promise { if (this._cursor.closed) { throw new MongoshRuntimeError('Cannot call tryNext on closed cursor'); } - return this._cursor.tryNext(); - } - - get [Symbol.for('@@mongosh.syntheticAsyncIterable')]() { - return true; - } - - async *[Symbol.asyncIterator]() { - let doc; - while ((doc = await this.tryNext()) !== null) { - yield doc; - } - } - - isClosed(): boolean { - return this._cursor.closed; + return super.tryNext(); } - isExhausted(): never { + override isExhausted(): never { throw new MongoshInvalidInputError( 'isExhausted is not implemented for ChangeStreams because after closing a cursor, the remaining documents in the batch are no longer accessible. If you want to see if the cursor is closed use isClosed. If you want to see if there are documents left in the batch, use tryNext.' ); } @returnsPromise - async itcount(): Promise { - let count = 0; - while (await this.tryNext()) { - count++; + override async next(): Promise { + if (!this._blockingWarningDisabled) { + await this._instanceState.printWarning( + 'If there are no documents in the batch, next will block. Use tryNext if you want to check if there are ' + + 'any documents without waiting, or cursor.disableBlockWarnings() if you want to disable this warning.' + ); } - return count; - } - - @returnsPromise - async next(): Promise { - await this._instanceState.printWarning( - 'If there are no documents in the batch, next will block. Use tryNext if you want to check if there are ' + - 'any documents without waiting.' - ); - return this._cursor.next(); + return (await super.next()) as Document; } getResumeToken(): ResumeToken { return this._cursor.resumeToken; } - map(): ChangeStreamCursor { - throw new MongoshUnimplementedError( - 'Cannot call map on a change stream cursor' - ); - } - forEach(): Promise { + override toArray(): never { throw new MongoshUnimplementedError( - 'Cannot call forEach on a change stream cursor' + 'Cannot call toArray on a change stream cursor' ); } - toArray(): Promise { + + override batchSize(): never { throw new MongoshUnimplementedError( - 'Cannot call toArray on a change stream cursor' + 'Cannot call batchSize on a change stream cursor' ); } - objsLeftInBatch(): void { + + override objsLeftInBatch(): never { throw new MongoshUnimplementedError( 'Cannot call objsLeftInBatch on a change stream cursor' ); } - @returnType('ChangeStreamCursor') - pretty(): ChangeStreamCursor { - return this; + override maxTimeMS(): never { + throw new MongoshUnimplementedError( + 'Cannot call maxTimeMS on a change stream cursor' + ); } } diff --git a/packages/shell-api/src/cursor.ts b/packages/shell-api/src/cursor.ts index ed4af384b7..2f784ae491 100644 --- a/packages/shell-api/src/cursor.ts +++ b/packages/shell-api/src/cursor.ts @@ -120,10 +120,11 @@ export default class Cursor extends AggregateOrFindCursor { - if (this._tailable) { + if (this._tailable && !this._blockingWarningDisabled) { await this._instanceState.printWarning( 'If this is a tailable cursor with awaitData, and there are no documents in the batch, this method ' + - 'will will block. Use tryNext if you want to check if there are any documents without waiting.' + 'will will block. Use tryNext if you want to check if there are any documents without waiting, or ' + + 'cursor.disableBlockWarnings() if you want to disable this warning.' ); } return super.hasNext(); @@ -162,10 +163,11 @@ export default class Cursor extends AggregateOrFindCursor { - if (this._tailable) { + if (this._tailable && !this._blockingWarningDisabled) { await this._instanceState.printWarning( 'If this is a tailable cursor with awaitData, and there are no documents in the batch, this' + - ' method will will block. Use tryNext if you want to check if there are any documents without waiting.' + ' method will will block. Use tryNext if you want to check if there are any documents without waiting,' + + ' or cursor.disableBlockWarnings() if you want to disable this warning.' ); } return super.next(); diff --git a/packages/shell-api/src/helpers.ts b/packages/shell-api/src/helpers.ts index b2c5f2b5e1..360dab5f8c 100644 --- a/packages/shell-api/src/helpers.ts +++ b/packages/shell-api/src/helpers.ts @@ -28,7 +28,7 @@ import type { import type { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption'; import type { AutoEncryptionOptions, Long, ObjectId, Timestamp } from 'mongodb'; import { shellApiType } from './enums'; -import type { AbstractCursor } from './abstract-cursor'; +import type { AbstractFiniteCursor } from './abstract-cursor'; import type ChangeStreamCursor from './change-stream-cursor'; import type { ShellBson } from './shell-bson'; import { inspect } from 'util'; @@ -861,7 +861,7 @@ export function addHiddenDataProperty( export async function iterate( results: CursorIterationResult, - cursor: AbstractCursor | ChangeStreamCursor, + cursor: AbstractFiniteCursor | ChangeStreamCursor, batchSize: number ): Promise { if (cursor.isClosed()) { diff --git a/packages/shell-api/src/run-command-cursor.ts b/packages/shell-api/src/run-command-cursor.ts index 20fbc992d2..a6edfaccaf 100644 --- a/packages/shell-api/src/run-command-cursor.ts +++ b/packages/shell-api/src/run-command-cursor.ts @@ -1,10 +1,10 @@ import type Mongo from './mongo'; import { shellApiClassDefault } from './decorators'; import type { ServiceProviderRunCommandCursor } from '@mongosh/service-provider-core'; -import { AbstractCursor } from './abstract-cursor'; +import { AbstractFiniteCursor } from './abstract-cursor'; @shellApiClassDefault -export default class RunCommandCursor extends AbstractCursor { +export default class RunCommandCursor extends AbstractFiniteCursor { constructor(mongo: Mongo, cursor: ServiceProviderRunCommandCursor) { super(mongo, cursor); } diff --git a/packages/shell-api/src/shell-api.ts b/packages/shell-api/src/shell-api.ts index 476f9d0535..f07a1b4adf 100644 --- a/packages/shell-api/src/shell-api.ts +++ b/packages/shell-api/src/shell-api.ts @@ -328,7 +328,7 @@ export default class ShellApi extends ShellApiClass { @directShellCommand @returnsPromise - async it(): Promise { + async it(): Promise { if (!this._instanceState.currentCursor) { return new CursorIterationResult(); } diff --git a/packages/shell-api/src/shell-instance-state.ts b/packages/shell-api/src/shell-instance-state.ts index 0f286e949e..4b17e62c9d 100644 --- a/packages/shell-api/src/shell-instance-state.ts +++ b/packages/shell-api/src/shell-instance-state.ts @@ -5,6 +5,7 @@ import type { ConnectionInfo, ServerApi, ServiceProvider, + ServiceProviderBaseCursor, TopologyDescription, } from '@mongosh/service-provider-core'; import { DEFAULT_DB } from '@mongosh/service-provider-core'; @@ -17,17 +18,10 @@ import type { } from '@mongosh/types'; import { EventEmitter } from 'events'; import redactInfo from 'mongodb-redact'; -import type ChangeStreamCursor from './change-stream-cursor'; import { toIgnore } from './decorators'; import { Topologies } from './enums'; import { ShellApiErrors } from './error-codes'; -import type { - AggregationCursor, - Cursor, - RunCommandCursor, - ShellResult, - DatabaseWithSchema, -} from './index'; +import type { ShellResult, DatabaseWithSchema } from './index'; import { getShellApiType, Mongo, ReplicaSet, Shard, ShellApi } from './index'; import { InterruptFlag } from './interruptor'; import { TransformMongoErrorPlugin } from './mongo-errors'; @@ -40,6 +34,7 @@ import { ShellLog } from './shell-log'; import type { AutocompletionContext } from '@mongodb-js/mongodb-ts-autocomplete'; import type { JSONSchema } from 'mongodb-schema'; import { analyzeDocuments } from 'mongodb-schema'; +import type { BaseCursor } from './abstract-cursor'; /** * The subset of CLI options that is relevant for the shell API's behavior itself. @@ -147,12 +142,7 @@ const CONTROL_CHAR_REGEXP = /[\x00-\x1F\x7F-\x9F]/g; * instances). */ export class ShellInstanceState { - public currentCursor: - | Cursor - | AggregationCursor - | ChangeStreamCursor - | RunCommandCursor - | null; + public currentCursor: BaseCursor | null; public currentDb: DatabaseWithSchema; public messageBus: MongoshBus; public initialServiceProvider: ServiceProvider; // the initial service provider