From 53dbf918e9637a19c9e4112a029187bffa4323c0 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 10 Oct 2025 11:25:27 -0400 Subject: [PATCH 01/11] feat(NODE-5510): dont filter change stream options --- src/change_stream.ts | 13 ++----------- .../change-streams/change_stream.test.ts | 4 ++-- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 6cb118cb095..ce4aeb795bb 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate'; import type { OperationParent } from './operations/command'; import type { ServerSessionId } from './sessions'; import { CSOTTimeoutContext, type TimeoutContext } from './timeout'; -import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; - -const CHANGE_STREAM_OPTIONS = [ - 'resumeAfter', - 'startAfter', - 'startAtOperationTime', - 'fullDocument', - 'fullDocumentBeforeChange', - 'showExpandedEvents' -] as const; +import { getTopology, type MongoDBNamespace, squashError } from './utils'; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -900,7 +891,7 @@ export class ChangeStream< private _createChangeStreamCursor( options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { - const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS); + const changeStreamStageOptions: Document = { ...options }; if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 32da9530831..f9d5f7e4d23 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -160,10 +160,10 @@ describe('Change Streams', function () { }); }); - it('ignores any invalid option values', function () { + it('allows invalid option values', function () { const changeStream = collection.watch([], { invalidOption: true }); - expect(changeStream).not.to.have.nested.property( + expect(changeStream).to.have.nested.property( 'cursor.pipeline[0].$changeStream.invalidOption' ); }); From f759ff7014f5107708a9d08480296eb0bd638bac Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 15 Oct 2025 15:38:57 +0200 Subject: [PATCH 02/11] chore: poc blacklist --- src/change_stream.ts | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index ce4aeb795bb..92b8077762b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -20,7 +20,7 @@ import type { AggregateOptions } from './operations/aggregate'; import type { OperationParent } from './operations/command'; import type { ServerSessionId } from './sessions'; import { CSOTTimeoutContext, type TimeoutContext } from './timeout'; -import { getTopology, type MongoDBNamespace, squashError } from './utils'; +import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -34,6 +34,34 @@ const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; +const INVALID_STAGE_OPTIONS = [ + 'raw', + 'useBigInt64', + 'promoteLongs', + 'promoteValues', + 'promoteBuffers', + 'ignoreUndefined', + 'bsonRegExp', + 'serializeFunctions', + 'fieldsAsRaw', + 'enableUtf8Validation', + 'timeoutMS', + 'readPreference' +]; + +export function filterOutOptions(options: AnyOptions, names: ReadonlyArray): AnyOptions { + const filterOptions: AnyOptions = {}; + + for (const name in options) { + if (!names.includes(name)) { + filterOptions[name] = options[name]; + } + } + + // Filtered options + return filterOptions; +} + /** * Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server. * @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume @@ -891,7 +919,7 @@ export class ChangeStream< private _createChangeStreamCursor( options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { - const changeStreamStageOptions: Document = { ...options }; + const changeStreamStageOptions: Document = filterOutOptions(options, INVALID_STAGE_OPTIONS); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } From a45d1b7eab5cccd30c025925010af2db66bfee43 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 20 Oct 2025 06:13:56 +0200 Subject: [PATCH 03/11] chore: move to set --- src/change_stream.ts | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 92b8077762b..c42938ccee5 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -34,31 +34,37 @@ const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; -const INVALID_STAGE_OPTIONS = [ - 'raw', - 'useBigInt64', +const INVALID_STAGE_OPTIONS = new Set([ + 'authdb', + 'batchSize', + 'bsonRegExp', + 'collation', + 'comment', + 'dbName', + 'enableUtf8Validation', + 'fieldsAsRaw', + 'ignoreUndefined', + 'maxAwaitTimeMS', + 'maxTimeMS', + 'promoteBuffers', 'promoteLongs', 'promoteValues', - 'promoteBuffers', - 'ignoreUndefined', - 'bsonRegExp', + 'raw', + 'rawData', + 'readPreference', 'serializeFunctions', - 'fieldsAsRaw', - 'enableUtf8Validation', + 'timeoutContext', 'timeoutMS', - 'readPreference' -]; + 'useBigInt64' +]); -export function filterOutOptions(options: AnyOptions, names: ReadonlyArray): AnyOptions { +export function filterOutOptions(options: AnyOptions): AnyOptions { const filterOptions: AnyOptions = {}; - for (const name in options) { - if (!names.includes(name)) { + if (!INVALID_STAGE_OPTIONS.has(name)) { filterOptions[name] = options[name]; } } - - // Filtered options return filterOptions; } @@ -919,7 +925,7 @@ export class ChangeStream< private _createChangeStreamCursor( options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { - const changeStreamStageOptions: Document = filterOutOptions(options, INVALID_STAGE_OPTIONS); + const changeStreamStageOptions: Document = filterOutOptions(options); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } From 14ad2ca6876c4d1ed36bf4f61a499b6ca8c58baf Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 20 Oct 2025 06:29:00 +0200 Subject: [PATCH 04/11] test: integration test --- src/change_stream.ts | 3 +- .../change-streams/change_stream.test.ts | 35 +++---------------- 2 files changed, 6 insertions(+), 32 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index c42938ccee5..41073feff1e 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -55,7 +55,8 @@ const INVALID_STAGE_OPTIONS = new Set([ 'serializeFunctions', 'timeoutContext', 'timeoutMS', - 'useBigInt64' + 'useBigInt64', + 'writeConcern' ]); export function filterOutOptions(options: AnyOptions): AnyOptions { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index f9d5f7e4d23..1c552fc3e49 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -60,7 +60,7 @@ async function forcePrimaryStepDown(client: MongoClient) { await sleep(15_000); } -describe('Change Streams', function () { +describe.only('Change Streams', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -1833,7 +1833,7 @@ describe('Change Streams', function () { }); context('invalid options', function () { - it('does not send invalid options on the aggregate command', { + it('server errors on invalid options on the initialize', { metadata: { requires: { topology: '!single' } }, test: async function () { const started: CommandStartedEvent[] = []; @@ -1843,35 +1843,8 @@ describe('Change Streams', function () { // @ts-expect-error: checking for invalid options cs = collection.watch([], doc); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); - - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; - - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); - } - }); - - it('does not send invalid options on the getMore command', { - metadata: { requires: { topology: '!single' } }, - test: async function () { - const started: CommandStartedEvent[] = []; - - client.on('commandStarted', filterForCommands(['aggregate'], started)); - const doc = { invalidBSONOption: true }; - // @ts-expect-error: checking for invalid options - cs = collection.watch([], doc); - - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); - - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; - - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + const error = await once(cs, 'change').catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); } }); }); From bcb53872cb6939fc35cada2c41e018a337d935ad Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 20 Oct 2025 07:27:10 +0200 Subject: [PATCH 05/11] fix: lint --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 1c552fc3e49..299049706f7 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -60,7 +60,7 @@ async function forcePrimaryStepDown(client: MongoClient) { await sleep(15_000); } -describe.only('Change Streams', function () { +describe('Change Streams', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; From fd6cd38659316560f194f0ff4de876c2a68fc04a Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 20 Oct 2025 17:28:39 +0200 Subject: [PATCH 06/11] Update src/change_stream.ts Co-authored-by: Sergey Zelenov --- src/change_stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index d97d08202d6..87ad850079b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -61,7 +61,7 @@ const INVALID_STAGE_OPTIONS = new Set([ export function filterOutOptions(options: AnyOptions): AnyOptions { const filterOptions: AnyOptions = {}; - for (const name in options) { + for (const name of Object.keys(options)) { if (!INVALID_STAGE_OPTIONS.has(name)) { filterOptions[name] = options[name]; } From 006eb3b7a633d50ddeac3359b595b4ad1dc83186 Mon Sep 17 00:00:00 2001 From: bailey Date: Mon, 20 Oct 2025 15:09:34 -0600 Subject: [PATCH 07/11] add a TS compile time check --- src/change_stream.ts | 105 ++++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 32 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 87ad850079b..f47b928d04f 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -15,7 +15,7 @@ import { MongoRuntimeError } from './error'; import { MongoClient } from './mongo_client'; -import { type InferIdType, TypedEventEmitter } from './mongo_types'; +import { type Abortable, type InferIdType, TypedEventEmitter } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; import type { OperationParent } from './operations/command'; import type { ServerSessionId } from './sessions'; @@ -34,39 +34,12 @@ const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; -const INVALID_STAGE_OPTIONS = new Set([ - 'authdb', - 'batchSize', - 'bsonRegExp', - 'collation', - 'comment', - 'dbName', - 'enableUtf8Validation', - 'fieldsAsRaw', - 'ignoreUndefined', - 'maxAwaitTimeMS', - 'maxTimeMS', - 'promoteBuffers', - 'promoteLongs', - 'promoteValues', - 'raw', - 'rawData', - 'readPreference', - 'serializeFunctions', - 'timeoutContext', - 'timeoutMS', - 'useBigInt64', - 'writeConcern' -]); +const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions(); export function filterOutOptions(options: AnyOptions): AnyOptions { - const filterOptions: AnyOptions = {}; - for (const name of Object.keys(options)) { - if (!INVALID_STAGE_OPTIONS.has(name)) { - filterOptions[name] = options[name]; - } - } - return filterOptions; + return Object.fromEntries( + Object.entries(options).filter(([k, _]) => INVALID_STAGE_OPTIONS.has(k)) + ); } /** @@ -1110,3 +1083,71 @@ export class ChangeStream< } } } + +/** + * This function returns a list of options that are *not* supported by the $changeStream + * aggregation stage. This is best-effort - it uses the options "officially supported" by the driver + * to derive a list of known, unsupported options for the $changeStream stage. + * + * Notably, at runtime, users can still provide options unknown to the driver and the driver will + * *not* filter them out of the options object (see NODE-5510). + */ +function buildDisallowedChangeStreamOptions(): Set { + /** hard-coded list of allowed ChangeStream options */ + type CSOptions = + | 'resumeAfter' + | 'startAfter' + | 'startAtOperationTime' + | 'fullDocument' + | 'fullDocumentBeforeChange' + | 'showExpandedEvents'; + + /** + * a type representing all known options that the driver supports that are *not* change stream stage options. + * + * each known key is mapped to a non-optional string, so that if new driver-specific options are added, the + * instantiation of `denyList` below results in a TS error. + */ + type DisallowedOptions = { + [k in Exclude]: string; + }; + + const denyList: DisallowedOptions = { + explain: '', + checkKeys: '', + serializeFunctions: '', + ignoreUndefined: '', + useBigInt64: '', + promoteLongs: '', + promoteBuffers: '', + promoteValues: '', + fieldsAsRaw: '', + bsonRegExp: '', + raw: '', + readConcern: '', + collation: '', + maxTimeMS: '', + comment: '', + dbName: '', + authdb: '', + rawData: '', + session: '', + willRetryWrite: '', + readPreference: '', + bypassPinningCheck: '', + omitMaxTimeMS: '', + timeoutMS: '', + enableUtf8Validation: '', + allowDiskUse: '', + batchSize: '', + bypassDocumentValidation: '', + cursor: '', + maxAwaitTimeMS: '', + hint: '', + let: '', + out: '', + timeoutMode: '' + }; + + return new Set(Object.keys(denyList)); +} From 21901b6f5e841526f8f160b504d557ac91fa85f2 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 21 Oct 2025 08:47:37 +0200 Subject: [PATCH 08/11] fix: lint --- src/change_stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index f47b928d04f..b951e7d51c4 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -15,7 +15,7 @@ import { MongoRuntimeError } from './error'; import { MongoClient } from './mongo_client'; -import { type Abortable, type InferIdType, TypedEventEmitter } from './mongo_types'; +import { type InferIdType, TypedEventEmitter } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; import type { OperationParent } from './operations/command'; import type { ServerSessionId } from './sessions'; From 0658ec1cb2e69b53a5b5d0ab3fbad408f24a0d4c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 21 Oct 2025 09:15:05 +0200 Subject: [PATCH 09/11] fix: options filtering --- src/change_stream.ts | 61 ++++++++++++++++++--------------- test/unit/change_stream.test.ts | 23 +++++++++++++ 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index b951e7d51c4..88df4a67a70 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -38,7 +38,7 @@ const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions(); export function filterOutOptions(options: AnyOptions): AnyOptions { return Object.fromEntries( - Object.entries(options).filter(([k, _]) => INVALID_STAGE_OPTIONS.has(k)) + Object.entries(options).filter(([k, _]) => !INVALID_STAGE_OPTIONS.has(k)) ); } @@ -898,6 +898,7 @@ export class ChangeStream< options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { const changeStreamStageOptions: Document = filterOutOptions(options); + console.log(changeStreamStageOptions); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } @@ -1109,44 +1110,48 @@ function buildDisallowedChangeStreamOptions(): Set { * instantiation of `denyList` below results in a TS error. */ type DisallowedOptions = { - [k in Exclude]: string; + [k in Exclude< + keyof ChangeStreamOptions & { timeoutContext: TimeoutContext }, + CSOptions + >]: string; }; const denyList: DisallowedOptions = { - explain: '', - checkKeys: '', - serializeFunctions: '', - ignoreUndefined: '', - useBigInt64: '', - promoteLongs: '', - promoteBuffers: '', - promoteValues: '', - fieldsAsRaw: '', + allowDiskUse: '', + authdb: '', + batchSize: '', bsonRegExp: '', - raw: '', - readConcern: '', + bypassDocumentValidation: '', + bypassPinningCheck: '', + checkKeys: '', collation: '', - maxTimeMS: '', comment: '', + cursor: '', dbName: '', - authdb: '', - rawData: '', - session: '', - willRetryWrite: '', - readPreference: '', - bypassPinningCheck: '', - omitMaxTimeMS: '', - timeoutMS: '', enableUtf8Validation: '', - allowDiskUse: '', - batchSize: '', - bypassDocumentValidation: '', - cursor: '', - maxAwaitTimeMS: '', + explain: '', + fieldsAsRaw: '', hint: '', + ignoreUndefined: '', let: '', + maxAwaitTimeMS: '', + maxTimeMS: '', + omitMaxTimeMS: '', out: '', - timeoutMode: '' + promoteBuffers: '', + promoteLongs: '', + promoteValues: '', + raw: '', + rawData: '', + readConcern: '', + readPreference: '', + serializeFunctions: '', + session: '', + timeoutContext: '', + timeoutMS: '', + timeoutMode: '', + useBigInt64: '', + willRetryWrite: '' }; return new Set(Object.keys(denyList)); diff --git a/test/unit/change_stream.test.ts b/test/unit/change_stream.test.ts index 36976181bd5..cefdd58a482 100644 --- a/test/unit/change_stream.test.ts +++ b/test/unit/change_stream.test.ts @@ -2,6 +2,7 @@ import { Long, Timestamp } from 'bson'; import { expect } from 'chai'; import * as sinon from 'sinon'; +import { filterOutOptions } from '../../src/change_stream'; import { ChangeStreamCursor } from '../../src/cursor/change_stream_cursor'; import { MongoClient } from '../../src/mongo_client'; import { MongoDBNamespace } from '../../src/utils'; @@ -11,6 +12,28 @@ describe('ChangeStreamCursor', function () { sinon.restore(); }); + describe('#filterOutOptions', function () { + const options = { + raw: false, + useBigInt64: false, + promoteLongs: true, + promoteValues: true, + promoteBuffers: false, + ignoreUndefined: false, + bsonRegExp: false, + serializeFunctions: false, + fieldsAsRaw: {}, + enableUtf8Validation: true, + fullDocument: true + }; + + it('filters out all invalid options', function () { + expect(filterOutOptions(options)).to.deep.equal({ + fullDocument: true + }); + }); + }); + describe('get resumeOptions()', function () { context('when there is a cached resumeToken', function () { it('copies all non-resume related options from the original cursor', function () { From ec383444be87cb476b1b3062cd631f8ccd905573 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 21 Oct 2025 10:02:15 +0200 Subject: [PATCH 10/11] chore: remove log --- src/change_stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 88df4a67a70..afd7d0ae55f 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -898,7 +898,6 @@ export class ChangeStream< options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { const changeStreamStageOptions: Document = filterOutOptions(options); - console.log(changeStreamStageOptions); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } From e18fb43da613819753f23188347915e17165cf9f Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 21 Oct 2025 10:20:46 +0200 Subject: [PATCH 11/11] fix: add write concern to list --- src/change_stream.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index afd7d0ae55f..f5b94bdd560 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -1150,7 +1150,8 @@ function buildDisallowedChangeStreamOptions(): Set { timeoutMS: '', timeoutMode: '', useBigInt64: '', - willRetryWrite: '' + willRetryWrite: '', + writeConcern: '' }; return new Set(Object.keys(denyList));