Skip to content

Commit

Permalink
fix(NODE-5374): do not apply cursor transform in Cursor.hasNext (#3746)
Browse files Browse the repository at this point in the history
Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
baileympearson and durran committed Jul 5, 2023
1 parent ccc5e30 commit 0668cd8
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 176 deletions.
169 changes: 99 additions & 70 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ export abstract class AbstractCursor<
return true;
}

const doc = await nextAsync<TSchema>(this, true);
const doc = await next<TSchema>(this, { blocking: true, transform: false });

if (doc) {
this[kDocuments].unshift(doc);
Expand All @@ -377,7 +377,7 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

return nextAsync(this, true);
return next(this, { blocking: true, transform: true });
}

/**
Expand All @@ -388,7 +388,7 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

return nextAsync(this, false);
return next(this, { blocking: false, transform: true });
}

/**
Expand Down Expand Up @@ -680,88 +680,112 @@ export abstract class AbstractCursor<
}
}

function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
const doc = cursor[kDocuments].shift();

if (doc && cursor[kTransform]) {
return cursor[kTransform](doc) as T;
}

return doc;
}

const nextAsync = promisify(
next as <T>(
cursor: AbstractCursor<T>,
blocking: boolean,
callback: (e: Error, r: T | null) => void
) => void
);

/**
* @param cursor - the cursor on which to call `next`
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
* not indicate the end of the cursor.
* @param callback - callback to return the result to the caller
* @returns
* @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
*/
export function next<T>(
async function next<T>(
cursor: AbstractCursor<T>,
blocking: boolean,
callback: Callback<T | null>
): void {
{
blocking,
transform
}: {
blocking: boolean;
transform: boolean;
}
): Promise<T | null> {
const cursorId = cursor[kId];
if (cursor.closed) {
return callback(undefined, null);
return null;
}

if (cursor[kDocuments].length !== 0) {
callback(undefined, nextDocument<T>(cursor));
return;
const doc = cursor[kDocuments].shift();

if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}

return doc;
}

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
cursor[kInit](err => {
if (err) return callback(err);
return next(cursor, blocking, callback);
});

return;
const init = promisify(cb => cursor[kInit](cb));
await init();
return next(cursor, { blocking, transform });
}

if (cursorIsDead(cursor)) {
return cleanupCursor(cursor, undefined, () => callback(undefined, null));
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;
cursor._getMore(batchSize, (error, response) => {
if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
cursor._getMore(batchSize, cb)
);

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
let response: Document | undefined;
try {
response = await getMore(batchSize);
} catch (error) {
if (error) {
await cleanupCursorAsync(cursor, { error }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
}
}

if (error || cursorIsDead(cursor)) {
return cleanupCursor(cursor, { error }, () => callback(error, nextDocument<T>(cursor)));
}
if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

if (cursor[kDocuments].length === 0 && blocking === false) {
return callback(undefined, null);
}
cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}

next(cursor, blocking, callback);
});
if (cursorIsDead(cursor)) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}

return next(cursor, { blocking, transform });
}

function cursorIsDead(cursor: AbstractCursor): boolean {
Expand All @@ -781,6 +805,10 @@ function cleanupCursor(
const server = cursor[kServer];
const session = cursor[kSession];
const error = options?.error;

// Cursors only emit closed events once the client-side cursor has been exhausted fully or there
// was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
// cleanup the cursor but don't emit a `close` event.
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;

if (error) {
Expand Down Expand Up @@ -881,8 +909,21 @@ class ReadableCursorStream extends Readable {
}

private _readNext() {
next(this._cursor, true, (err, result) => {
if (err) {
next(this._cursor, { blocking: true, transform: true }).then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().catch(() => null);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
Expand Down Expand Up @@ -911,18 +952,6 @@ class ReadableCursorStream extends Readable {
// See NODE-4475.
return this.destroy(err);
}

if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().catch(() => null);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
});
);
}
}
49 changes: 18 additions & 31 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,6 @@ describe('Cursor', function () {
expect(cursor).property('closed', false);

const willClose = once(cursor, 'close');
const willEnd = once(stream, 'end');

const dataEvents = on(stream, 'data');

Expand All @@ -1722,13 +1721,9 @@ describe('Cursor', function () {
// After 5 successful data events, destroy stream
stream.destroy();

// We should get an end event on the stream and a close event on the cursor
// We should **not** get an 'error' event,
// We should get a close event on the stream and a close event on the cursor
// We should **not** get an 'error' or an 'end' event,
// the following will throw if either stream or cursor emitted an 'error' event
await Promise.race([
willEnd,
sleep(100).then(() => Promise.reject(new Error('end event never emitted')))
]);
await Promise.race([
willClose,
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
Expand Down Expand Up @@ -3589,11 +3584,8 @@ describe('Cursor', function () {
await client.close();
});

it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });

const db = client.db(configuration.db);
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () {
const db = client.db(this.configuration.db);
const collection = db.collection('cursor_session_tests2');

const docs = [
Expand All @@ -3604,25 +3596,20 @@ describe('Cursor', function () {
{ a: 9, b: 10 }
];

collection.insertMany(docs, err => {
expect(err).to.not.exist;
const cursor = collection.find({}, { batchSize: 3 });
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(1);
cursor.next(function () {
expect(client.s.activeSessions.size).to.equal(0);
cursor.close(() => {
client.close(done);
});
});
});
});
});
});
await collection.insertMany(docs);

const cursor = await collection.find({}, { batchSize: 3 });
for (let i = 0; i < 3; ++i) {
await cursor.next();
expect(client.s.activeSessions.size).to.equal(1);
}

await cursor.next();
expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal(
0
);

await cursor.close();
});

describe('#clone', function () {
Expand Down

0 comments on commit 0668cd8

Please sign in to comment.