From 1d86f7fb074fc4d53ab5967963f47a80cb2e22d3 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 27 Apr 2021 14:25:08 +0200 Subject: [PATCH 1/3] fix(shell-api): handle errors in cursor.map/.forEach MONGOSH-703 Handle exceptions from callbacks passed to `cursor.map()` and `cursor.forEach()`. Since `cursor.forEach()` returns a Promise, use that to propagate the error to the user. For `cursor.map()`, this is a bit trickier, since the method itself does not return any results to the user. Therefore, we add checks before and after accessing cursor methods that read data which forward the exception if one has occurred. Both of these feel like situations that the driver should take care of explicitly, so NODE tickets have been opened for both cases. (One alternative would have been to add async context/Node.js domain propagation to these methods. However, I decided against that, because that solution would be a) Node.js-specific, and b) because it would still lead to undesirable results, like the Promise returned by `.forEach()` never resolving.) --- packages/shell-api/src/abstract-cursor.ts | 66 +++++++++++++++++-- .../shell-api/src/change-stream-cursor.ts | 2 +- packages/shell-api/src/helpers.ts | 9 ++- packages/shell-api/src/integration.spec.ts | 48 ++++++++++++++ 4 files changed, 112 insertions(+), 13 deletions(-) diff --git a/packages/shell-api/src/abstract-cursor.ts b/packages/shell-api/src/abstract-cursor.ts index 049567ef5a..fae8da4ac8 100644 --- a/packages/shell-api/src/abstract-cursor.ts +++ b/packages/shell-api/src/abstract-cursor.ts @@ -24,12 +24,37 @@ export abstract class AbstractCursor extends ShellApiClass { abstract _cursor: ServiceProviderAggregationCursor | ServiceProviderCursor; _currentIterationResult: CursorIterationResult | null = null; _batchSize: number | null = null; + _mapError: Error | null = null; constructor(mongo: Mongo) { super(); this._mongo = mongo; } + // Wrap a function with checks before and after that verify whether a .map() + // callback has resulted in an exception. Such an error would otherwise result + // in an uncaught exception, bringing the whole process down. + // The downside to this is that errors will not actually be visible until + // the caller tries to interact with this cursor in a way that triggers + // these checks. Since that is also the behavior for errors coming from the + // database server, it makes sense to match that. + // Ideally, this kind of code could be lifted into the driver (NODE-3231 and + // NODE-3232 are the tickets for that). + async _withCheckMapError(fn: () => Ret): Promise { + if (this._mapError) { + // If an error has already occurred, we don't want to call the function + // at all. + throw this._mapError; + } + const ret = await fn(); + if (this._mapError) { + // If an error occurred during the function, we don't want to forward its + // results. + throw this._mapError; + } + return ret; + } + /** * Internal method to determine what is printed for this class. */ @@ -39,7 +64,7 @@ export abstract class AbstractCursor extends ShellApiClass { async _it(): Promise { const results = this._currentIterationResult = new CursorIterationResult(); - await iterate(results, this._cursor, this._batchSize ?? await this._mongo._batchSize()); + await iterate(results, this, this._batchSize ?? await this._mongo._batchSize()); results.cursorHasMore = !this.isExhausted(); return results; } @@ -57,17 +82,31 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async forEach(f: (doc: Document) => void): Promise { - return this._cursor.forEach(f); + // Work around https://jira.mongodb.org/browse/NODE-3231 + let exception; + const wrapped = (doc: Document): boolean | undefined => { + try { + f(doc); + return undefined; + } catch (err) { + exception = err; + return false; // Stop iteration. + } + }; + await this._cursor.forEach(wrapped); + if (exception) { + throw exception; + } } @returnsPromise async hasNext(): Promise { - return this._cursor.hasNext(); + return this._withCheckMapError(() => this._cursor.hasNext()); } @returnsPromise async tryNext(): Promise { - return this._cursor.tryNext(); + return this._withCheckMapError(() => this._cursor.tryNext()); } async* [Symbol.asyncIterator]() { @@ -96,7 +135,7 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async toArray(): Promise { - return this._cursor.toArray(); + return this._withCheckMapError(() => this._cursor.toArray()); } @returnType('this') @@ -106,7 +145,20 @@ export abstract class AbstractCursor extends ShellApiClass { @returnType('this') map(f: (doc: Document) => Document): this { - this._cursor.map(f); + // Work around https://jira.mongodb.org/browse/NODE-3232 + const wrapped = (doc: Document): Document => { + if (this._mapError) { + // These errors should never become visible to the user. + return { __errored: true }; + } + try { + return f(doc); + } catch (err) { + this._mapError = err; + return { __errored: true }; + } + }; + this._cursor.map(wrapped); return this; } @@ -118,7 +170,7 @@ export abstract class AbstractCursor extends ShellApiClass { @returnsPromise async next(): Promise { - return this._cursor.next(); + return this._withCheckMapError(() => this._cursor.next()); } @returnType('this') diff --git a/packages/shell-api/src/change-stream-cursor.ts b/packages/shell-api/src/change-stream-cursor.ts index 85b3f5333e..eb87a401c6 100644 --- a/packages/shell-api/src/change-stream-cursor.ts +++ b/packages/shell-api/src/change-stream-cursor.ts @@ -43,7 +43,7 @@ export default class ChangeStreamCursor extends ShellApiClass { throw new MongoshRuntimeError('ChangeStreamCursor is closed'); } const result = this._currentIterationResult = new CursorIterationResult(); - return iterate(result, this._cursor, this._batchSize ?? await this._mongo._batchSize()); + return iterate(result, this, this._batchSize ?? await this._mongo._batchSize()); } /** diff --git a/packages/shell-api/src/helpers.ts b/packages/shell-api/src/helpers.ts index 6ba0a62dca..f316fa0ecb 100644 --- a/packages/shell-api/src/helpers.ts +++ b/packages/shell-api/src/helpers.ts @@ -3,12 +3,9 @@ import type { DbOptions, Document, ExplainVerbosityLike, - FindCursor, - AggregationCursor as SPAggregationCursor, FindAndModifyOptions, DeleteOptions, MapReduceOptions, - ChangeStream, KMSProviders, ExplainOptions } from '@mongosh/service-provider-core'; @@ -22,6 +19,8 @@ import { BinaryType, ReplPlatform } from '@mongosh/service-provider-core'; import { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption'; import { AutoEncryptionOptions } from 'mongodb'; import { shellApiType } from './enums'; +import type { AbstractCursor } from './abstract-cursor'; +import type ChangeStreamCursor from './change-stream-cursor'; /** * Helper method to adapt aggregation pipeline options. @@ -525,9 +524,9 @@ export function addHiddenDataProperty(target: T, key: string|symbol, va export async function iterate( results: CursorIterationResult, - cursor: FindCursor | SPAggregationCursor | ChangeStream, + cursor: AbstractCursor | ChangeStreamCursor, batchSize: number): Promise { - if (cursor.closed) { + if (cursor.isClosed()) { return results; } diff --git a/packages/shell-api/src/integration.spec.ts b/packages/shell-api/src/integration.spec.ts index 8d04cfedbc..4a2b2293f7 100644 --- a/packages/shell-api/src/integration.spec.ts +++ b/packages/shell-api/src/integration.spec.ts @@ -1959,4 +1959,52 @@ describe('Shell API (integration)', function() { expect((await collection.find().batchSize(50)._it()).documents).to.have.lengthOf(50); }); }); + + describe('cursor map/forEach', () => { + beforeEach(async() => { + await collection.insertMany([...Array(10).keys()].map(i => ({ i }))); + }); + + it('forEach() iterates over input but does not return anything', async() => { + let value = 0; + const result = await collection.find().forEach(({ i }) => { value += i; }); + expect(result).to.equal(undefined); + expect(value).to.equal(45); + }); + + it('map() iterates over input and changes documents in-place', async() => { + const cursor = collection.find(); + cursor.map(({ i }) => ({ j: i })); + expect((await cursor._it()).documents[0]).to.deep.equal({ j: 0 }); + }); + + it('forEach() errors lead to a rejected promise', async() => { + const error = new Error(); + let calls = 0; + try { + await collection.find().forEach(() => { calls++; throw error; }); + expect.fail('missed exception'); + } catch (err) { + expect(err).to.equal(error); + } + expect(calls).to.equal(1); + }); + + it('map() errors show up when reading the cursor', async() => { + const error = new Error(); + const cursor = collection.find(); + let calls = 0; + cursor.map(() => { calls++; throw error; }); + for (let i = 0; i < 2; i++) { + // Try reading twice to make sure .map() is not called again for the second attempt. + try { + await cursor.tryNext(); + expect.fail('missed exception'); + } catch (err) { + expect(err).to.equal(error); + } + } + expect(calls).to.equal(1); + }); + }); }); From a4a454e00944492974a4f1ec51173e6aa0d967a4 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 27 Apr 2021 17:00:32 +0200 Subject: [PATCH 2/3] fixup! fix(shell-api): handle errors in cursor.map/.forEach MONGOSH-703 --- packages/shell-api/src/aggregation-cursor.spec.ts | 2 +- packages/shell-api/src/cursor.spec.ts | 2 +- packages/shell-api/src/explainable-cursor.spec.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/shell-api/src/aggregation-cursor.spec.ts b/packages/shell-api/src/aggregation-cursor.spec.ts index 16a8009e1c..c5f048b653 100644 --- a/packages/shell-api/src/aggregation-cursor.spec.ts +++ b/packages/shell-api/src/aggregation-cursor.spec.ts @@ -67,7 +67,7 @@ describe('AggregationCursor', () => { it('calls wrappee.map with arguments', () => { const arg = {}; cursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); }); diff --git a/packages/shell-api/src/cursor.spec.ts b/packages/shell-api/src/cursor.spec.ts index 71ab6c3b1d..0558759242 100644 --- a/packages/shell-api/src/cursor.spec.ts +++ b/packages/shell-api/src/cursor.spec.ts @@ -71,7 +71,7 @@ describe('Cursor', () => { it('calls wrappee.map with arguments', () => { const arg = {}; cursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); it('has the correct metadata', () => { diff --git a/packages/shell-api/src/explainable-cursor.spec.ts b/packages/shell-api/src/explainable-cursor.spec.ts index 65d0a8fec8..6cf851c3a2 100644 --- a/packages/shell-api/src/explainable-cursor.spec.ts +++ b/packages/shell-api/src/explainable-cursor.spec.ts @@ -56,7 +56,7 @@ describe('ExplainableCursor', () => { it('calls wrappee.map with arguments', () => { const arg = () => {}; eCursor.map(arg); - expect(wrappee.map.calledWith(arg)).to.equal(true); + expect(wrappee.map).to.have.callCount(1); }); it('has the correct metadata', () => { From bff8149b2adc1262a0c5527936eb729cb12a1912 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 27 Apr 2021 18:25:06 +0200 Subject: [PATCH 3/3] fixup --- .../java-shell/src/test/resources/cursor/forEach.expected.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/java-shell/src/test/resources/cursor/forEach.expected.txt b/packages/java-shell/src/test/resources/cursor/forEach.expected.txt index b132d22632..ff1d39b61f 100644 --- a/packages/java-shell/src/test/resources/cursor/forEach.expected.txt +++ b/packages/java-shell/src/test/resources/cursor/forEach.expected.txt @@ -1,2 +1 @@ -null -[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ] \ No newline at end of file +[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ]