Skip to content

Commit

Permalink
feat(NODE-4059): ChangeStreamDocument not fully typed to specification (
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored May 4, 2022
1 parent ddb7d81 commit 8b24212
Show file tree
Hide file tree
Showing 10 changed files with 777 additions and 184 deletions.
324 changes: 226 additions & 98 deletions src/change_stream.ts

Large diffs are not rendered by default.

41 changes: 36 additions & 5 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { BSONSerializeOptions, Document, resolveBSONOptions } from './bson';
import type { AnyBulkWriteOperation, BulkWriteOptions, BulkWriteResult } from './bulk/common';
import { OrderedBulkOperation } from './bulk/ordered';
import { UnorderedBulkOperation } from './bulk/unordered';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions } from './change_stream';
import { AggregationCursor } from './cursor/aggregation_cursor';
import { FindCursor } from './cursor/find_cursor';
import type { Db } from './db';
Expand Down Expand Up @@ -1418,21 +1418,52 @@ export class Collection<TSchema extends Document = Document> {
/**
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
*
* @since 3.0.0
* @remarks
* watch() accepts two generic arguments for distinct usecases:
* - The first is to override the schema that may be defined for this specific collection
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
* @example
* By just providing the first argument I can type the change to be `ChangeStreamDocument<{ _id: number }>`
* ```ts
* collection.watch<{ _id: number }>()
* .on('change', change => console.log(change._id.toFixed(4)));
* ```
*
* @example
* Passing a second argument provides a way to reflect the type changes caused by an advanced pipeline.
* Here, we are using a pipeline to have MongoDB filter for insert changes only and add a comment.
* No need start from scratch on the ChangeStreamInsertDocument type!
* By using an intersection we can save time and ensure defaults remain the same type!
* ```ts
* collection
* .watch<Schema, ChangeStreamInsertDocument<Schema> & { comment: string }>([
* { $addFields: { comment: 'big changes' } },
* { $match: { operationType: 'insert' } }
* ])
* .on('change', change => {
* change.comment.startsWith('big');
* change.operationType === 'insert';
* // No need to narrow in code because the generics did that for us!
* expectType<Schema>(change.fullDocument);
* });
* ```
*
* @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TLocal - Type of the data being detected by the change stream
* @typeParam TChange - Type of the whole change stream document emitted
*/
watch<TLocal extends Document = TSchema>(
watch<TLocal extends Document = TSchema, TChange extends Document = ChangeStreamDocument<TLocal>>(
pipeline: Document[] = [],
options: ChangeStreamOptions = {}
): ChangeStream<TLocal> {
): ChangeStream<TLocal, TChange> {
// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}

return new ChangeStream<TLocal>(this, pipeline, resolveOptions(this, options));
return new ChangeStream<TLocal, TChange>(this, pipeline, resolveOptions(this, options));
}

/**
Expand Down
19 changes: 13 additions & 6 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Admin } from './admin';
import { BSONSerializeOptions, Document, resolveBSONOptions } from './bson';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions } from './change_stream';
import { Collection, CollectionOptions } from './collection';
import * as CONSTANTS from './constants';
import { AggregationCursor } from './cursor/aggregation_cursor';
Expand Down Expand Up @@ -719,20 +719,27 @@ export class Db {
* replacements, deletions, and invalidations) in this database. Will ignore all
* changes to system collections.
*
* @remarks
* watch() accepts two generic arguments for distinct usecases:
* - The first is to provide the schema that may be defined for all the collections within this database
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
*
* @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TSchema - Type of the data being detected by the change stream
* @typeParam TChange - Type of the whole change stream document emitted
*/
watch<TSchema extends Document = Document>(
pipeline: Document[] = [],
options: ChangeStreamOptions = {}
): ChangeStream<TSchema> {
watch<
TSchema extends Document = Document,
TChange extends Document = ChangeStreamDocument<TSchema>
>(pipeline: Document[] = [], options: ChangeStreamOptions = {}): ChangeStream<TSchema, TChange> {
// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}

return new ChangeStream<TSchema>(this, pipeline, resolveOptions(this, options));
return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
}

/** Return the db logger */
Expand Down
11 changes: 11 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,20 @@ export type {
ChangeStreamAggregateRawResult,
ChangeStreamCursor,
ChangeStreamCursorOptions,
ChangeStreamDeleteDocument,
ChangeStreamDocument,
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamEvents,
ChangeStreamInsertDocument,
ChangeStreamInvalidateDocument,
ChangeStreamNameSpace,
ChangeStreamOptions,
ChangeStreamRenameDocument,
ChangeStreamReplaceDocument,
ChangeStreamUpdateDocument,
OperationTime,
PipeOptions,
ResumeOptions,
Expand Down
19 changes: 13 additions & 6 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { TcpNetConnectOpts } from 'net';
import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls';

import { BSONSerializeOptions, Document, resolveBSONOptions } from './bson';
import { ChangeStream, ChangeStreamOptions } from './change_stream';
import { ChangeStream, ChangeStreamDocument, ChangeStreamOptions } from './change_stream';
import type { AuthMechanismProperties, MongoCredentials } from './cmap/auth/mongo_credentials';
import type { AuthMechanism } from './cmap/auth/providers';
import type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS } from './cmap/connect';
Expand Down Expand Up @@ -590,20 +590,27 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
* replacements, deletions, and invalidations) in this cluster. Will ignore all
* changes to system collections, as well as the local, admin, and config databases.
*
* @remarks
* watch() accepts two generic arguments for distinct usecases:
* - The first is to provide the schema that may be defined for all the data within the current cluster
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
*
* @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TSchema - Type of the data being detected by the change stream
* @typeParam TChange - Type of the whole change stream document emitted
*/
watch<TSchema extends Document = Document>(
pipeline: Document[] = [],
options: ChangeStreamOptions = {}
): ChangeStream<TSchema> {
watch<
TSchema extends Document = Document,
TChange extends Document = ChangeStreamDocument<TSchema>
>(pipeline: Document[] = [], options: ChangeStreamOptions = {}): ChangeStream<TSchema, TChange> {
// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}

return new ChangeStream<TSchema>(this, pipeline, resolveOptions(this, options));
return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
}

/** Return the mongo client logger */
Expand Down
Loading

0 comments on commit 8b24212

Please sign in to comment.