Skip to content
94 changes: 83 additions & 11 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
import type { OperationParent } from './operations/command';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument',
'fullDocumentBeforeChange',
'showExpandedEvents'
] as const;
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -43,6 +34,14 @@ const NO_RESUME_TOKEN_ERROR =
'A change stream document has been received that lacks a resume token (_id).';
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';

const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions();

export function filterOutOptions(options: AnyOptions): AnyOptions {
return Object.fromEntries(
Object.entries(options).filter(([k, _]) => !INVALID_STAGE_OPTIONS.has(k))
);
}

/**
* Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server.
* @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
Expand Down Expand Up @@ -898,7 +897,7 @@ export class ChangeStream<
private _createChangeStreamCursor(
options: ChangeStreamOptions | ChangeStreamCursorOptions
): ChangeStreamCursor<TSchema, TChange> {
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions: Document = filterOutOptions(options);
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}
Expand Down Expand Up @@ -1084,3 +1083,76 @@ export class ChangeStream<
}
}
}

/**
* This function returns a list of options that are *not* supported by the $changeStream
* aggregation stage. This is best-effort - it uses the options "officially supported" by the driver
* to derive a list of known, unsupported options for the $changeStream stage.
*
* Notably, at runtime, users can still provide options unknown to the driver and the driver will
* *not* filter them out of the options object (see NODE-5510).
*/
function buildDisallowedChangeStreamOptions(): Set<string> {
/** hard-coded list of allowed ChangeStream options */
type CSOptions =
| 'resumeAfter'
| 'startAfter'
| 'startAtOperationTime'
| 'fullDocument'
| 'fullDocumentBeforeChange'
| 'showExpandedEvents';

/**
* a type representing all known options that the driver supports that are *not* change stream stage options.
*
* each known key is mapped to a non-optional string, so that if new driver-specific options are added, the
* instantiation of `denyList` below results in a TS error.
*/
type DisallowedOptions = {
[k in Exclude<
keyof ChangeStreamOptions & { timeoutContext: TimeoutContext },
CSOptions
>]: string;
};

const denyList: DisallowedOptions = {
allowDiskUse: '',
authdb: '',
batchSize: '',
bsonRegExp: '',
bypassDocumentValidation: '',
bypassPinningCheck: '',
checkKeys: '',
collation: '',
comment: '',
cursor: '',
dbName: '',
enableUtf8Validation: '',
explain: '',
fieldsAsRaw: '',
hint: '',
ignoreUndefined: '',
let: '',
maxAwaitTimeMS: '',
maxTimeMS: '',
omitMaxTimeMS: '',
out: '',
promoteBuffers: '',
promoteLongs: '',
promoteValues: '',
raw: '',
rawData: '',
readConcern: '',
readPreference: '',
serializeFunctions: '',
session: '',
timeoutContext: '',
timeoutMS: '',
timeoutMode: '',
useBigInt64: '',
willRetryWrite: '',
writeConcern: ''
};

return new Set(Object.keys(denyList));
}
37 changes: 5 additions & 32 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ describe('Change Streams', function () {
});
});

it('ignores any invalid option values', function () {
it('allows invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
Expand Down Expand Up @@ -1809,7 +1809,7 @@ describe('Change Streams', function () {
});

context('invalid options', function () {
it('does not send invalid options on the aggregate command', {
it('server errors on invalid options on the initialize', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started: CommandStartedEvent[] = [];
Expand All @@ -1819,35 +1819,8 @@ describe('Change Streams', function () {
// @ts-expect-error: checking for invalid options
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});

it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
// @ts-expect-error: checking for invalid options
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
const error = await once(cs, 'change').catch(error => error);
expect(error).to.be.instanceOf(MongoServerError);
}
});
});
Expand Down
23 changes: 23 additions & 0 deletions test/unit/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Long, Timestamp } from 'bson';
import { expect } from 'chai';
import * as sinon from 'sinon';

import { filterOutOptions } from '../../src/change_stream';
import { ChangeStreamCursor } from '../../src/cursor/change_stream_cursor';
import { MongoClient } from '../../src/mongo_client';
import { MongoDBNamespace } from '../../src/utils';
Expand All @@ -11,6 +12,28 @@ describe('ChangeStreamCursor', function () {
sinon.restore();
});

describe('#filterOutOptions', function () {
const options = {
raw: false,
useBigInt64: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: false,
ignoreUndefined: false,
bsonRegExp: false,
serializeFunctions: false,
fieldsAsRaw: {},
enableUtf8Validation: true,
fullDocument: true
};

it('filters out all invalid options', function () {
expect(filterOutOptions(options)).to.deep.equal({
fullDocument: true
});
});
});

describe('get resumeOptions()', function () {
context('when there is a cached resumeToken', function () {
it('copies all non-resume related options from the original cursor', function () {
Expand Down