Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
null
[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ]
[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ]
66 changes: 59 additions & 7 deletions packages/shell-api/src/abstract-cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,37 @@ export abstract class AbstractCursor extends ShellApiClass {
abstract _cursor: ServiceProviderAggregationCursor | ServiceProviderCursor;
_currentIterationResult: CursorIterationResult | null = null;
_batchSize: number | null = null;
_mapError: Error | null = null;

constructor(mongo: Mongo) {
super();
this._mongo = mongo;
}

// Wrap a function with checks before and after that verify whether a .map()
// callback has resulted in an exception. Such an error would otherwise result
// in an uncaught exception, bringing the whole process down.
// The downside to this is that errors will not actually be visible until
// the caller tries to interact with this cursor in a way that triggers
// these checks. Since that is also the behavior for errors coming from the
// database server, it makes sense to match that.
// Ideally, this kind of code could be lifted into the driver (NODE-3231 and
// NODE-3232 are the tickets for that).
async _withCheckMapError<Ret>(fn: () => Ret): Promise<Ret> {
if (this._mapError) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick! is also pretty clear, i thought it would have been more hacky

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah … would still be nice if the driver had something here :)

// If an error has already occurred, we don't want to call the function
// at all.
throw this._mapError;
}
const ret = await fn();
if (this._mapError) {
// If an error occurred during the function, we don't want to forward its
// results.
throw this._mapError;
}
return ret;
}

/**
* Internal method to determine what is printed for this class.
*/
Expand All @@ -39,7 +64,7 @@ export abstract class AbstractCursor extends ShellApiClass {

async _it(): Promise<CursorIterationResult> {
const results = this._currentIterationResult = new CursorIterationResult();
await iterate(results, this._cursor, this._batchSize ?? await this._mongo._batchSize());
await iterate(results, this, this._batchSize ?? await this._mongo._batchSize());
results.cursorHasMore = !this.isExhausted();
return results;
}
Expand All @@ -57,17 +82,31 @@ export abstract class AbstractCursor extends ShellApiClass {

@returnsPromise
async forEach(f: (doc: Document) => void): Promise<void> {
return this._cursor.forEach(f);
// Work around https://jira.mongodb.org/browse/NODE-3231
let exception;
const wrapped = (doc: Document): boolean | undefined => {
try {
f(doc);
return undefined;
} catch (err) {
exception = err;
return false; // Stop iteration.
}
};
await this._cursor.forEach(wrapped);
if (exception) {
throw exception;
}
}

@returnsPromise
async hasNext(): Promise<boolean> {
return this._cursor.hasNext();
return this._withCheckMapError(() => this._cursor.hasNext());
}

@returnsPromise
async tryNext(): Promise<Document | null> {
return this._cursor.tryNext();
return this._withCheckMapError(() => this._cursor.tryNext());
}

async* [Symbol.asyncIterator]() {
Expand Down Expand Up @@ -96,7 +135,7 @@ export abstract class AbstractCursor extends ShellApiClass {

@returnsPromise
async toArray(): Promise<Document[]> {
return this._cursor.toArray();
return this._withCheckMapError(() => this._cursor.toArray());
}

@returnType('this')
Expand All @@ -106,7 +145,20 @@ export abstract class AbstractCursor extends ShellApiClass {

@returnType('this')
map(f: (doc: Document) => Document): this {
this._cursor.map(f);
// Work around https://jira.mongodb.org/browse/NODE-3232
const wrapped = (doc: Document): Document => {
if (this._mapError) {
// These errors should never become visible to the user.
return { __errored: true };
}
try {
return f(doc);
} catch (err) {
this._mapError = err;
return { __errored: true };
}
};
this._cursor.map(wrapped);
return this;
}

Expand All @@ -118,7 +170,7 @@ export abstract class AbstractCursor extends ShellApiClass {

@returnsPromise
async next(): Promise<Document | null> {
return this._cursor.next();
return this._withCheckMapError(() => this._cursor.next());
}

@returnType('this')
Expand Down
2 changes: 1 addition & 1 deletion packages/shell-api/src/aggregation-cursor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('AggregationCursor', () => {
it('calls wrappee.map with arguments', () => {
const arg = {};
cursor.map(arg);
expect(wrappee.map.calledWith(arg)).to.equal(true);
expect(wrappee.map).to.have.callCount(1);
});
});

Expand Down
2 changes: 1 addition & 1 deletion packages/shell-api/src/change-stream-cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default class ChangeStreamCursor extends ShellApiClass {
throw new MongoshRuntimeError('ChangeStreamCursor is closed');
}
const result = this._currentIterationResult = new CursorIterationResult();
return iterate(result, this._cursor, this._batchSize ?? await this._mongo._batchSize());
return iterate(result, this, this._batchSize ?? await this._mongo._batchSize());
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/shell-api/src/cursor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe('Cursor', () => {
it('calls wrappee.map with arguments', () => {
const arg = {};
cursor.map(arg);
expect(wrappee.map.calledWith(arg)).to.equal(true);
expect(wrappee.map).to.have.callCount(1);
});

it('has the correct metadata', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/shell-api/src/explainable-cursor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('ExplainableCursor', () => {
it('calls wrappee.map with arguments', () => {
const arg = () => {};
eCursor.map(arg);
expect(wrappee.map.calledWith(arg)).to.equal(true);
expect(wrappee.map).to.have.callCount(1);
});

it('has the correct metadata', () => {
Expand Down
9 changes: 4 additions & 5 deletions packages/shell-api/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ import type {
DbOptions,
Document,
ExplainVerbosityLike,
FindCursor,
AggregationCursor as SPAggregationCursor,
FindAndModifyOptions,
DeleteOptions,
MapReduceOptions,
ChangeStream,
KMSProviders,
ExplainOptions
} from '@mongosh/service-provider-core';
Expand All @@ -22,6 +19,8 @@ import { BinaryType, ReplPlatform } from '@mongosh/service-provider-core';
import { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption';
import { AutoEncryptionOptions } from 'mongodb';
import { shellApiType } from './enums';
import type { AbstractCursor } from './abstract-cursor';
import type ChangeStreamCursor from './change-stream-cursor';

/**
* Helper method to adapt aggregation pipeline options.
Expand Down Expand Up @@ -525,9 +524,9 @@ export function addHiddenDataProperty<T = any>(target: T, key: string|symbol, va

export async function iterate(
results: CursorIterationResult,
cursor: FindCursor | SPAggregationCursor | ChangeStream,
cursor: AbstractCursor | ChangeStreamCursor,
batchSize: number): Promise<CursorIterationResult> {
if (cursor.closed) {
if (cursor.isClosed()) {
return results;
}

Expand Down
48 changes: 48 additions & 0 deletions packages/shell-api/src/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1959,4 +1959,52 @@ describe('Shell API (integration)', function() {
expect((await collection.find().batchSize(50)._it()).documents).to.have.lengthOf(50);
});
});

describe('cursor map/forEach', () => {
beforeEach(async() => {
await collection.insertMany([...Array(10).keys()].map(i => ({ i })));
});

it('forEach() iterates over input but does not return anything', async() => {
let value = 0;
const result = await collection.find().forEach(({ i }) => { value += i; });
expect(result).to.equal(undefined);
expect(value).to.equal(45);
});

it('map() iterates over input and changes documents in-place', async() => {
const cursor = collection.find();
cursor.map(({ i }) => ({ j: i }));
expect((await cursor._it()).documents[0]).to.deep.equal({ j: 0 });
});

it('forEach() errors lead to a rejected promise', async() => {
const error = new Error();
let calls = 0;
try {
await collection.find().forEach(() => { calls++; throw error; });
expect.fail('missed exception');
} catch (err) {
expect(err).to.equal(error);
}
expect(calls).to.equal(1);
});

it('map() errors show up when reading the cursor', async() => {
const error = new Error();
const cursor = collection.find();
let calls = 0;
cursor.map(() => { calls++; throw error; });
for (let i = 0; i < 2; i++) {
// Try reading twice to make sure .map() is not called again for the second attempt.
try {
await cursor.tryNext();
expect.fail('missed exception');
} catch (err) {
expect(err).to.equal(error);
}
}
expect(calls).to.equal(1);
});
});
});