From 929a6cd3f20e95a70651cada4a4713109ea0588e Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Mon, 5 Feb 2024 18:22:37 -0500 Subject: [PATCH 1/2] fix: propagate errors to transformed stream in cursor --- src/cursor/abstract_cursor.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d528d9b5d8..412361d0e8 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -337,7 +337,7 @@ export abstract class AbstractCursor< const transform = options.transform; const readable = new ReadableCursorStream(this); - return readable.pipe( + const transformedStream = readable.pipe( new Transform({ objectMode: true, highWaterMark: 1, @@ -351,6 +351,12 @@ export abstract class AbstractCursor< } }) ); + + // Bubble errors to transformed stream, because otherwise no way + // to handle this error. + readable.on('error', err => transformedStream.emit('error', err)); + + return transformedStream; } return new ReadableCursorStream(this); From 9deebbddfba08d3134b0cccbd07026b04ed0a8a1 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Thu, 8 Feb 2024 11:57:47 -0500 Subject: [PATCH 2/2] test: add test case covering transform streams throwing a MongoServerError --- .../node-specific/abstract_cursor.test.ts | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index eab72617f0..8a382724c6 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,9 +1,16 @@ import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; +import { Transform } from 'stream'; import { inspect } from 'util'; -import { type Collection, type FindCursor, MongoAPIError, type MongoClient } from '../../mongodb'; +import { + type Collection, + type FindCursor, + MongoAPIError, + type MongoClient, + MongoServerError +} from '../../mongodb'; describe('class AbstractCursor', function () { describe('regression tests NODE-5372', function () { @@ -233,4 +240,39 @@ describe('class AbstractCursor', function () { }); }); }); + + describe('transform stream error handling', function () { + let client: MongoClient; + let collection: Collection; + const docs = [{ count: 0 }]; + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany(docs); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + it('propagates errors to transform stream', async function () { + const transform = new Transform({ + transform(data, encoding, callback) { + callback(null, data); + } + }); + + // MongoServerError: unknown operator: $bar + const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform }); + + const error: Error | null = await new Promise(resolve => { + stream.on('error', error => resolve(error)); + stream.on('end', () => resolve(null)); + }); + expect(error).to.be.instanceof(MongoServerError); + }); + }); });