diff --git a/packages/java-shell/src/test/resources/cursor/forEach.expected.txt b/packages/java-shell/src/test/resources/cursor/forEach.expected.txt index b132d22632..ff1d39b61f 100644 --- a/packages/java-shell/src/test/resources/cursor/forEach.expected.txt +++ b/packages/java-shell/src/test/resources/cursor/forEach.expected.txt @@ -1,2 +1 @@ -null -[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ] \ No newline at end of file +[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ] diff --git a/packages/shell-api/src/abstract-cursor.ts b/packages/shell-api/src/abstract-cursor.ts index 049567ef5a..fae8da4ac8 100644 --- a/packages/shell-api/src/abstract-cursor.ts +++ b/packages/shell-api/src/abstract-cursor.ts @@ -24,12 +24,37 @@ export abstract class AbstractCursor extends ShellApiClass { abstract _cursor: ServiceProviderAggregationCursor | ServiceProviderCursor; _currentIterationResult: CursorIterationResult | null = null; _batchSize: number | null = null; + _mapError: Error | null = null; constructor(mongo: Mongo) { super(); this._mongo = mongo; } + // Wrap a function with checks before and after that verify whether a .map() + // callback has resulted in an exception. Such an error would otherwise result + // in an uncaught exception, bringing the whole process down. + // The downside to this is that errors will not actually be visible until + // the caller tries to interact with this cursor in a way that triggers + // these checks. Since that is also the behavior for errors coming from the + // database server, it makes sense to match that. + // Ideally, this kind of code could be lifted into the driver (NODE-3231 and + // NODE-3232 are the tickets for that). + async _withCheckMapError(fn: () => Ret): Promise { + if (this._mapError) { + // If an error has already occurred, we don't want to call the function + // at all. + throw this._mapError; + } + const ret = await fn(); + if (this._mapError) { + // If an error occurred during the function, we don't want to forward its + // results. + throw this._mapError; + } + return ret; + } + /** * Internal method to determine what is printed for this class. */ @@ -39,7 +64,7 @@ export abstract class AbstractCursor extends ShellApiClass { async _it(): Promise { const results = this._currentIterationResult = new CursorIterationResult(); - await iterate(results, this._cursor, this._batchSize ?? await this._mongo._batchSize()); + await iterate(results, this, this._batchSize ?? await this._mongo._batchSize()); results.cursorHasMore = !this.isExhausted(); return results; } @@ -57,17 +82,31 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async forEach(f: (doc: Document) => void): Promise { - return this._cursor.forEach(f); + // Work around https://jira.mongodb.org/browse/NODE-3231 + let exception; + const wrapped = (doc: Document): boolean | undefined => { + try { + f(doc); + return undefined; + } catch (err) { + exception = err; + return false; // Stop iteration. + } + }; + await this._cursor.forEach(wrapped); + if (exception) { + throw exception; + } } @returnsPromise async hasNext(): Promise { - return this._cursor.hasNext(); + return this._withCheckMapError(() => this._cursor.hasNext()); } @returnsPromise async tryNext(): Promise { - return this._cursor.tryNext(); + return this._withCheckMapError(() => this._cursor.tryNext()); } async* [Symbol.asyncIterator]() { @@ -96,7 +135,7 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async toArray(): Promise { - return this._cursor.toArray(); + return this._withCheckMapError(() => this._cursor.toArray()); } @returnType('this') @@ -106,7 +145,20 @@ export abstract class AbstractCursor extends ShellApiClass { @returnType('this') map(f: (doc: Document) => Document): this { - this._cursor.map(f); + // Work around https://jira.mongodb.org/browse/NODE-3232 + const wrapped = (doc: Document): Document => { + if (this._mapError) { + // These errors should never become visible to the user. + return { __errored: true }; + } + try { + return f(doc); + } catch (err) { + this._mapError = err; + return { __errored: true }; + } + }; + this._cursor.map(wrapped); return this; } @@ -118,7 +170,7 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async next(): Promise { - return this._cursor.next(); + return this._withCheckMapError(() => this._cursor.next()); } @returnType('this') diff --git a/packages/shell-api/src/aggregation-cursor.spec.ts b/packages/shell-api/src/aggregation-cursor.spec.ts index 16a8009e1c..c5f048b653 100644 --- a/packages/shell-api/src/aggregation-cursor.spec.ts +++ b/packages/shell-api/src/aggregation-cursor.spec.ts @@ -67,7 +67,7 @@ describe('AggregationCursor', () => { it('calls wrappee.map with arguments', () => { const arg = {}; cursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); }); diff --git a/packages/shell-api/src/change-stream-cursor.ts b/packages/shell-api/src/change-stream-cursor.ts index 85b3f5333e..eb87a401c6 100644 --- a/packages/shell-api/src/change-stream-cursor.ts +++ b/packages/shell-api/src/change-stream-cursor.ts @@ -43,7 +43,7 @@ export default class ChangeStreamCursor extends ShellApiClass { throw new MongoshRuntimeError('ChangeStreamCursor is closed'); } const result = this._currentIterationResult = new CursorIterationResult(); - return iterate(result, this._cursor, this._batchSize ?? await this._mongo._batchSize()); + return iterate(result, this, this._batchSize ?? await this._mongo._batchSize()); } /** diff --git a/packages/shell-api/src/cursor.spec.ts b/packages/shell-api/src/cursor.spec.ts index 71ab6c3b1d..0558759242 100644 --- a/packages/shell-api/src/cursor.spec.ts +++ b/packages/shell-api/src/cursor.spec.ts @@ -71,7 +71,7 @@ describe('Cursor', () => { it('calls wrappee.map with arguments', () => { const arg = {}; cursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); it('has the correct metadata', () => { diff --git a/packages/shell-api/src/explainable-cursor.spec.ts b/packages/shell-api/src/explainable-cursor.spec.ts index 65d0a8fec8..6cf851c3a2 100644 --- a/packages/shell-api/src/explainable-cursor.spec.ts +++ b/packages/shell-api/src/explainable-cursor.spec.ts @@ -56,7 +56,7 @@ describe('ExplainableCursor', () => { it('calls wrappee.map with arguments', () => { const arg = () => {}; eCursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); it('has the correct metadata', () => { diff --git a/packages/shell-api/src/helpers.ts b/packages/shell-api/src/helpers.ts index 6ba0a62dca..f316fa0ecb 100644 --- a/packages/shell-api/src/helpers.ts +++ b/packages/shell-api/src/helpers.ts @@ -3,12 +3,9 @@ import type { DbOptions, Document, ExplainVerbosityLike, - FindCursor, - AggregationCursor as SPAggregationCursor, FindAndModifyOptions, DeleteOptions, MapReduceOptions, - ChangeStream, KMSProviders, ExplainOptions } from '@mongosh/service-provider-core'; @@ -22,6 +19,8 @@ import { BinaryType, ReplPlatform } from '@mongosh/service-provider-core'; import { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption'; import { AutoEncryptionOptions } from 'mongodb'; import { shellApiType } from './enums'; +import type { AbstractCursor } from './abstract-cursor'; +import type ChangeStreamCursor from './change-stream-cursor'; /** * Helper method to adapt aggregation pipeline options. @@ -525,9 +524,9 @@ export function addHiddenDataProperty(target: T, key: string|symbol, va export async function iterate( results: CursorIterationResult, - cursor: FindCursor | SPAggregationCursor | ChangeStream, + cursor: AbstractCursor | ChangeStreamCursor, batchSize: number): Promise { - if (cursor.closed) { + if (cursor.isClosed()) { return results; } diff --git a/packages/shell-api/src/integration.spec.ts b/packages/shell-api/src/integration.spec.ts index 8d04cfedbc..4a2b2293f7 100644 --- a/packages/shell-api/src/integration.spec.ts +++ b/packages/shell-api/src/integration.spec.ts @@ -1959,4 +1959,52 @@ describe('Shell API (integration)', function() { expect((await collection.find().batchSize(50)._it()).documents).to.have.lengthOf(50); }); }); + + describe('cursor map/forEach', () => { + beforeEach(async() => { + await collection.insertMany([...Array(10).keys()].map(i => ({ i }))); + }); + + it('forEach() iterates over input but does not return anything', async() => { + let value = 0; + const result = await collection.find().forEach(({ i }) => { value += i; }); + expect(result).to.equal(undefined); + expect(value).to.equal(45); + }); + + it('map() iterates over input and changes documents in-place', async() => { + const cursor = collection.find(); + cursor.map(({ i }) => ({ j: i })); + expect((await cursor._it()).documents[0]).to.deep.equal({ j: 0 }); + }); + + it('forEach() errors lead to a rejected promise', async() => { + const error = new Error(); + let calls = 0; + try { + await collection.find().forEach(() => { calls++; throw error; }); + expect.fail('missed exception'); + } catch (err) { + expect(err).to.equal(error); + } + expect(calls).to.equal(1); + }); + + it('map() errors show up when reading the cursor', async() => { + const error = new Error(); + const cursor = collection.find(); + let calls = 0; + cursor.map(() => { calls++; throw error; }); + for (let i = 0; i < 2; i++) { + // Try reading twice to make sure .map() is not called again for the second attempt. + try { + await cursor.tryNext(); + expect.fail('missed exception'); + } catch (err) { + expect(err).to.equal(error); + } + } + expect(calls).to.equal(1); + }); + }); });