diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 2f928c25a7..97f168faee 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -361,7 +361,7 @@ export abstract class AbstractCursor< return true; } - const doc = await nextAsync(this, true); + const doc = await next(this, { blocking: true, transform: false }); if (doc) { this[kDocuments].unshift(doc); @@ -377,7 +377,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return nextAsync(this, true); + return next(this, { blocking: true, transform: true }); } /** @@ -388,7 +388,7 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return nextAsync(this, false); + return next(this, { blocking: false, transform: true }); } /** @@ -680,24 +680,6 @@ export abstract class AbstractCursor< } } -function nextDocument(cursor: AbstractCursor): T | null { - const doc = cursor[kDocuments].shift(); - - if (doc && cursor[kTransform]) { - return cursor[kTransform](doc) as T; - } - - return doc; -} - -const nextAsync = promisify( - next as ( - cursor: AbstractCursor, - blocking: boolean, - callback: (e: Error, r: T | null) => void - ) => void -); - /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data @@ -705,63 +687,105 @@ const nextAsync = promisify( * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and * `tryNext`, for example) blocking is necessary because a getMore returning no documents does * not indicate the end of the cursor. - * @param callback - callback to return the result to the caller - * @returns + * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists) + * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means + * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer. */ -export function next( +async function next( cursor: AbstractCursor, - blocking: boolean, - callback: Callback -): void { + { + blocking, + transform + }: { + blocking: boolean; + transform: boolean; + } +): Promise { const cursorId = cursor[kId]; if (cursor.closed) { - return callback(undefined, null); + return null; } if (cursor[kDocuments].length !== 0) { - callback(undefined, nextDocument(cursor)); - return; + const doc = cursor[kDocuments].shift(); + + if (doc != null && transform && cursor[kTransform]) { + try { + return cursor[kTransform](doc); + } catch (error) { + await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); + throw error; + } + } + + return doc; } if (cursorId == null) { // All cursors must operate within a session, one must be made implicitly if not explicitly provided - cursor[kInit](err => { - if (err) return callback(err); - return next(cursor, blocking, callback); - }); - - return; + const init = promisify(cb => cursor[kInit](cb)); + await init(); + return next(cursor, { blocking, transform }); } if (cursorIsDead(cursor)) { - return cleanupCursor(cursor, undefined, () => callback(undefined, null)); + // if the cursor is dead, we clean it up + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); + return null; } // otherwise need to call getMore const batchSize = cursor[kOptions].batchSize || 1000; - cursor._getMore(batchSize, (error, response) => { - if (response) { - const cursorId = - typeof response.cursor.id === 'number' - ? Long.fromNumber(response.cursor.id) - : typeof response.cursor.id === 'bigint' - ? Long.fromBigInt(response.cursor.id) - : response.cursor.id; + const getMore = promisify((batchSize: number, cb: Callback) => + cursor._getMore(batchSize, cb) + ); - cursor[kDocuments].pushMany(response.cursor.nextBatch); - cursor[kId] = cursorId; + let response: Document | undefined; + try { + response = await getMore(batchSize); + } catch (error) { + if (error) { + await cleanupCursorAsync(cursor, { error }).catch(() => { + // `cleanupCursorAsync` should never throw, but if it does we want to throw the original + // error instead. + }); + throw error; } + } - if (error || cursorIsDead(cursor)) { - return cleanupCursor(cursor, { error }, () => callback(error, nextDocument(cursor))); - } + if (response) { + const cursorId = + typeof response.cursor.id === 'number' + ? Long.fromNumber(response.cursor.id) + : typeof response.cursor.id === 'bigint' + ? Long.fromBigInt(response.cursor.id) + : response.cursor.id; - if (cursor[kDocuments].length === 0 && blocking === false) { - return callback(undefined, null); - } + cursor[kDocuments].pushMany(response.cursor.nextBatch); + cursor[kId] = cursorId; + } - next(cursor, blocking, callback); - }); + if (cursorIsDead(cursor)) { + // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted, + // we intentionally clean up the cursor to release its session back into the pool before the cursor + // is iterated. This prevents a cursor that is exhausted on the server from holding + // onto a session indefinitely until the AbstractCursor is iterated. + // + // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver + // and we should surface the error + await cleanupCursorAsync(cursor, {}); + } + + if (cursor[kDocuments].length === 0 && blocking === false) { + return null; + } + + return next(cursor, { blocking, transform }); } function cursorIsDead(cursor: AbstractCursor): boolean { @@ -781,6 +805,10 @@ function cleanupCursor( const server = cursor[kServer]; const session = cursor[kSession]; const error = options?.error; + + // Cursors only emit closed events once the client-side cursor has been exhausted fully or there + // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we + // cleanup the cursor but don't emit a `close` event. const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; if (error) { @@ -881,8 +909,21 @@ class ReadableCursorStream extends Readable { } private _readNext() { - next(this._cursor, true, (err, result) => { - if (err) { + next(this._cursor, { blocking: true, transform: true }).then( + result => { + if (result == null) { + this.push(null); + } else if (this.destroyed) { + this._cursor.close().catch(() => null); + } else { + if (this.push(result)) { + return this._readNext(); + } + + this._readInProgress = false; + } + }, + err => { // NOTE: This is questionable, but we have a test backing the behavior. It seems the // desired behavior is that a stream ends cleanly when a user explicitly closes // a client during iteration. Alternatively, we could do the "right" thing and @@ -911,18 +952,6 @@ class ReadableCursorStream extends Readable { // See NODE-4475. return this.destroy(err); } - - if (result == null) { - this.push(null); - } else if (this.destroyed) { - this._cursor.close().catch(() => null); - } else { - if (this.push(result)) { - return this._readNext(); - } - - this._readInProgress = false; - } - }); + ); } } diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 28544267d6..0214bc52f1 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1708,7 +1708,6 @@ describe('Cursor', function () { expect(cursor).property('closed', false); const willClose = once(cursor, 'close'); - const willEnd = once(stream, 'end'); const dataEvents = on(stream, 'data'); @@ -1722,13 +1721,9 @@ describe('Cursor', function () { // After 5 successful data events, destroy stream stream.destroy(); - // We should get an end event on the stream and a close event on the cursor - // We should **not** get an 'error' event, + // We should get a close event on the stream and a close event on the cursor + // We should **not** get an 'error' or an 'end' event, // the following will throw if either stream or cursor emitted an 'error' event - await Promise.race([ - willEnd, - sleep(100).then(() => Promise.reject(new Error('end event never emitted'))) - ]); await Promise.race([ willClose, sleep(100).then(() => Promise.reject(new Error('close event never emitted'))) @@ -3589,11 +3584,8 @@ describe('Cursor', function () { await client.close(); }); - it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); - - const db = client.db(configuration.db); + it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () { + const db = client.db(this.configuration.db); const collection = db.collection('cursor_session_tests2'); const docs = [ @@ -3604,25 +3596,20 @@ describe('Cursor', function () { { a: 9, b: 10 } ]; - collection.insertMany(docs, err => { - expect(err).to.not.exist; - const cursor = collection.find({}, { batchSize: 3 }); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(1); - cursor.next(function () { - expect(client.s.activeSessions.size).to.equal(0); - cursor.close(() => { - client.close(done); - }); - }); - }); - }); - }); - }); + await collection.insertMany(docs); + + const cursor = await collection.find({}, { batchSize: 3 }); + for (let i = 0; i < 3; ++i) { + await cursor.next(); + expect(client.s.activeSessions.size).to.equal(1); + } + + await cursor.next(); + expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal( + 0 + ); + + await cursor.close(); }); describe('#clone', function () { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index d89f23f3e8..eab72617f0 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,114 +1,236 @@ import { expect } from 'chai'; +import { once } from 'events'; +import * as sinon from 'sinon'; import { inspect } from 'util'; -import { type Collection, MongoAPIError, type MongoClient } from '../../mongodb'; - -const falseyValues = [0, 0n, NaN, '', false, undefined]; +import { type Collection, type FindCursor, MongoAPIError, type MongoClient } from '../../mongodb'; describe('class AbstractCursor', function () { - let client: MongoClient; + describe('regression tests NODE-5372', function () { + let client: MongoClient; + let collection: Collection; + const docs = [{ count: 0 }, { count: 10 }]; + beforeEach(async function () { + client = this.configuration.newClient(); - let collection: Collection; - beforeEach(async function () { - client = this.configuration.newClient(); + collection = client.db('abstract_cursor_integration').collection('test'); - collection = client.db('abstract_cursor_integration').collection('test'); + await collection.insertMany(docs); + }); - await collection.insertMany(Array.from({ length: 5 }, (_, index) => ({ index }))); - }); + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); - afterEach(async function () { - await collection.deleteMany({}); - await client.close(); + it('cursors can be iterated with hasNext+next', async function () { + const cursor = collection + // sort ensures that the docs in the cursor are in the same order as the docs inserted + .find({}, { sort: { count: 1 } }) + .map(doc => ({ ...doc, count: doc.count + 1 })); + + for (let count = 0; await cursor.hasNext(); count++) { + const received = await cursor.next(); + const actual = docs[count]; + + expect(received.count).to.equal(actual.count + 1); + } + }); }); - context('toArray() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { - const cursor = collection.find(); - cursor.map(() => value); + describe('cursor iteration APIs', function () { + let client: MongoClient; + let collection: Collection; + const transformSpy = sinon.spy(doc => ({ ...doc, name: doc.name.toUpperCase() })); + beforeEach(async function () { + client = this.configuration.newClient(); - const result = await cursor.toArray(); + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany([{ name: 'john doe' }]); + }); + + afterEach(async function () { + transformSpy.resetHistory(); + + await collection.deleteMany({}); + await client.close(); + }); - const expected = Array.from({ length: 5 }, () => value); - expect(result).to.deep.equal(expected); + context(`hasNext()`, function () { + context('when there is a transform on the cursor', function () { + it(`the transform is NOT called`, async () => { + const cursor = collection.find().map(transformSpy); + + const hasNext = await cursor.hasNext(); + expect(transformSpy).not.to.have.been.called; + expect(hasNext).to.be.true; + }); + }); + }); + + const operations: ReadonlyArray Promise]> = [ + ['tryNext', (cursor: FindCursor) => cursor.tryNext()], + ['next', (cursor: FindCursor) => cursor.next()], + [ + 'Symbol.asyncIterator().next', + async (cursor: FindCursor) => { + const iterator = cursor[Symbol.asyncIterator](); + return iterator.next().then(({ value }) => value); + } + ], + [ + 'Cursor.stream', + (cursor: FindCursor) => { + const stream = cursor.stream(); + return once(stream, 'data').then(([doc]) => doc); + } + ] + ] as const; + + for (const [method, func] of operations) { + context(`${method}()`, function () { + context('when there is a transform on the cursor', function () { + it(`the transform is called`, async () => { + const cursor = collection.find().map(transformSpy); + + const doc = await func(cursor); + expect(transformSpy).to.have.been.calledOnce; + expect(doc.name).to.equal('JOHN DOE'); + }); + context('when the transform throws', function () { + it(`the error is propagated to the user`, async () => { + const cursor = collection.find().map(() => { + throw new Error('error thrown in transform'); + }); + + const error = await func(cursor).catch(e => e); + expect(error) + .to.be.instanceOf(Error) + .to.match(/error thrown in transform/); + expect(cursor.closed).to.be.true; + }); + }); + }); + + context('when there is not a transform on the cursor', function () { + it(`it returns the cursor's documents unmodified`, async () => { + const cursor = collection.find(); + + const doc = await func(cursor); + expect(doc.name).to.equal('john doe'); + }); + }); }); } + }); - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + describe('custom transforms with falsy values', function () { + let client: MongoClient; + const falseyValues = [0, 0n, NaN, '', false, undefined]; - const error = await cursor.toArray().catch(e => e); + let collection: Collection; + beforeEach(async function () { + client = this.configuration.newClient(); - expect(error).be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany(Array.from({ length: 5 }, (_, index) => ({ index }))); }); - }); - context('Symbol.asyncIterator() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { - const cursor = collection.find(); - cursor.map(() => value); + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); - let count = 0; + context('toArray() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); - for await (const document of cursor) { - expect(document).to.deep.equal(value); - count++; - } + const result = await cursor.toArray(); - expect(count).to.equal(5); - }); - } + const expected = Array.from({ length: 5 }, () => value); + expect(result).to.deep.equal(expected); + }); + } - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + it('throws when mapping to `null` and cleans up cursor', async function () { + const cursor = collection.find(); + cursor.map(() => null); - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const document of cursor) { - expect.fail('Expected error to be thrown'); - } - } catch (error) { - expect(error).to.be.instanceOf(MongoAPIError); + const error = await cursor.toArray().catch(e => e); + + expect(error).be.instanceOf(MongoAPIError); expect(cursor.closed).to.be.true; - } + }); }); - }); - context('forEach() with custom transforms', function () { - for (const value of falseyValues) { - it(`supports mapping to falsey value '${inspect(value)}'`, async function () { - const cursor = collection.find(); - cursor.map(() => value); + context('Symbol.asyncIterator() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); - let count = 0; + let count = 0; - function transform(value) { - expect(value).to.deep.equal(value); - count++; - } + for await (const document of cursor) { + expect(document).to.deep.equal(value); + count++; + } - await cursor.forEach(transform); + expect(count).to.equal(5); + }); + } - expect(count).to.equal(5); + it('throws when mapping to `null` and cleans up cursor', async function () { + const cursor = collection.find(); + cursor.map(() => null); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const document of cursor) { + expect.fail('Expected error to be thrown'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + expect(cursor.closed).to.be.true; + } }); - } + }); + + context('forEach() with custom transforms', function () { + for (const value of falseyValues) { + it(`supports mapping to falsey value '${inspect(value)}'`, async function () { + const cursor = collection.find(); + cursor.map(() => value); - it('throws when mapping to `null` and cleans up cursor', async function () { - const cursor = collection.find(); - cursor.map(() => null); + let count = 0; - function iterator() { - expect.fail('Expected no documents from cursor, received at least one.'); + function transform(value) { + expect(value).to.deep.equal(value); + count++; + } + + await cursor.forEach(transform); + + expect(count).to.equal(5); + }); } - const error = await cursor.forEach(iterator).catch(e => e); - expect(error).to.be.instanceOf(MongoAPIError); - expect(cursor.closed).to.be.true; + it('throws when mapping to `null` and cleans up cursor', async function () { + const cursor = collection.find(); + cursor.map(() => null); + + function iterator() { + expect.fail('Expected no documents from cursor, received at least one.'); + } + + const error = await cursor.forEach(iterator).catch(e => e); + expect(error).to.be.instanceOf(MongoAPIError); + expect(cursor.closed).to.be.true; + }); }); }); }); diff --git a/test/integration/node-specific/cursor_stream.test.js b/test/integration/node-specific/cursor_stream.test.js index 79096838fd..407d121780 100644 --- a/test/integration/node-specific/cursor_stream.test.js +++ b/test/integration/node-specific/cursor_stream.test.js @@ -296,7 +296,6 @@ describe('Cursor Streams', function () { const stream = cursor.stream(); stream.on('error', err => (error = err)); cursor.on('close', function () { - // NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error setImmediate(() => { expect(error).to.exist; client.close(done);