From f0e33ed9cb74635966db5f12e48d7d10600a1171 Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Fri, 20 Jan 2023 13:34:47 -0500 Subject: [PATCH 1/7] DOCSP-26099: change stream async iterable --- source/usage-examples/changeStream.txt | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index 9b61e76b5..bdf85d66e 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -34,15 +34,29 @@ The ``watch()`` method accepts an ``options`` object as the second parameter. Re section for more information on the settings you can configure with this object. The ``watch()`` method returns an instance of a `ChangeStream <{+api+}/classes/ChangeStream.html>`__. You can read events from -change streams by iterating over them or listening for events. Select the tab that corresponds to the way you want to -read events from the change stream below. +change streams by iterating over them or listening for events. -.. include:: /includes/changestream-paradigm-warning.rst +Select the tab that corresponds to the way you want to +read events from the change stream: .. tabs:: .. tab:: - :tabid: Iterative + :tabid: Idiomatic Iteration + + Starting in version 4.12, the ``ChangeStream`` class is an async + iterable. With this change, you can use ``for-await`` loops to + retrieve events from an open change stream: + + .. code-block:: js + :copyable: false + + for await (const change of changeStream) { + console.log("Received change: ", change); + } + + .. tab:: + :tabid: Manual Iteration You can call methods on the ``ChangeStream`` object such as: @@ -67,7 +81,6 @@ read events from the change stream below. can specify logic in the listener to process the change event document when it is received. - You can control the change stream by calling ``pause()`` to stop emitting events or ``resume()`` to continue to emit events. To stop processing change events, call the `close() <{+api+}/classes/ChangeStream.html#close>`__ method on the @@ -77,6 +90,8 @@ read events from the change stream below. changeStream.close(); +.. include:: /includes/changestream-paradigm-warning.rst + Visit the following resources for additional material on the classes and methods presented above: From 91155afd0ef0f73dc8c179b19c337e6dfcaeb8cd Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Mon, 6 Feb 2023 10:50:00 -0500 Subject: [PATCH 2/7] addtl work --- source/usage-examples/changeStream.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index bdf85d66e..f5de5ed7a 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -126,7 +126,7 @@ event and for the listener to complete its execution before closing the ``ChangeStream`` instance using the ``close()`` method. The timers used in this example are only necessary for this demonstration -to make sure there is enough time to register listener and have the +to make sure there is enough time to register the listener and have the listener process the event before exiting. .. include:: /includes/connect-guide-note.rst @@ -154,7 +154,7 @@ listener process the event before exiting. If you run the preceding example, you should see the following output: -.. code-block:: javascript +.. code-block:: none :copyable: false received a change to the collection: { From fd1cd1b75ca4a75e0f34bee045cb9aff203a5f46 Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Tue, 7 Feb 2023 15:21:59 -0500 Subject: [PATCH 3/7] MW PR fixes 1 --- .../usage-examples/changeStream.js | 26 +--- .../usage-examples/changeStream_listener.js | 44 +++++++ source/usage-examples/changeStream.txt | 124 ++++++++++++------ 3 files changed, 131 insertions(+), 63 deletions(-) create mode 100644 source/code-snippets/usage-examples/changeStream_listener.js diff --git a/source/code-snippets/usage-examples/changeStream.js b/source/code-snippets/usage-examples/changeStream.js index a7a3774fa..51e943e48 100644 --- a/source/code-snippets/usage-examples/changeStream.js +++ b/source/code-snippets/usage-examples/changeStream.js @@ -5,38 +5,22 @@ const uri = ""; const client = new MongoClient(uri); -const simulateAsyncPause = () => - new Promise(resolve => { - setTimeout(() => resolve(), 1000); - }); - let changeStream; async function run() { try { const database = client.db("insertDB"); const collection = database.collection("haikus"); - // open a Change Stream on the "haikus" collection + // Open a Change Stream on the "haikus" collection changeStream = collection.watch(); - // set up a listener when change events are emitted - changeStream.on("change", next => { - // process any change event - console.log("received a change to the collection: \t", next); - }); - - await simulateAsyncPause(); - - await collection.insertOne({ - title: "Record of a Shriveled Datum", - content: "No bytes, no problem. Just insert a document, in MongoDB", - }); - - await simulateAsyncPause(); + // Print change events + for await (const change of changeStream) { + console.log("Received change:\n", change); + } await changeStream.close(); - console.log("closed the change stream"); } finally { await client.close(); } diff --git a/source/code-snippets/usage-examples/changeStream_listener.js b/source/code-snippets/usage-examples/changeStream_listener.js new file mode 100644 index 000000000..b85585091 --- /dev/null +++ b/source/code-snippets/usage-examples/changeStream_listener.js @@ -0,0 +1,44 @@ +import { MongoClient } from "mongodb"; + +// Replace the uri string with your MongoDB deployment's connection string. +const uri = ""; + +const client = new MongoClient(uri); + +const simulateAsyncPause = () => + new Promise(resolve => { + setTimeout(() => resolve(), 1000); + }); + +let changeStream; +async function run() { + try { + const database = client.db("insertDB"); + const collection = database.collection("haikus"); + + // open a Change Stream on the "haikus" collection + changeStream = collection.watch(); + + // set up a listener when change events are emitted + changeStream.on("change", next => { + // process any change event + console.log("received a change to the collection: \t", next); + }); + + await simulateAsyncPause(); + + await collection.insertOne({ + title: "Record of a Shriveled Datum", + content: "No bytes, no problem. Just insert a document, in MongoDB", + }); + + await simulateAsyncPause(); + + await changeStream.close(); + + console.log("closed the change stream"); + } finally { + await client.close(); + } +} +run().catch(console.dir); \ No newline at end of file diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index f5de5ed7a..eda10e757 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -44,8 +44,8 @@ read events from the change stream: .. tab:: :tabid: Idiomatic Iteration - Starting in version 4.12, the ``ChangeStream`` class is an async - iterable. With this change, you can use ``for-await`` loops to + Starting in version 4.12, ``ChangeStream`` objects are async + iterables. With this change, you can use ``for-await`` loops to retrieve events from an open change stream: .. code-block:: js @@ -92,42 +92,14 @@ read events from the change stream: .. include:: /includes/changestream-paradigm-warning.rst -Visit the following resources for additional material on the classes and -methods presented above: +Examples +-------- -- :manual:`Change streams ` -- :manual:`Change events ` -- :manual:`Aggregation pipeline ` -- :manual:`Aggregation stages ` -- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__ -- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__, -- `Db.watch() <{+api+}/classes/Db.html#watch>`__, -- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__ - -Example -------- +Iteration +~~~~~~~~~ The following example opens a change stream on the ``haikus`` collection in -the ``insertDB`` database. Let's create a listener function to receive and -print change events that occur on the collection. - -First, open the change stream on the collection and then define a listener -on the change stream using the ``on()`` method. Once set, generate a change -event by performing a change to the collection. - -To generate the change event on the collection, let's use ``insertOne()`` -method to add a new document. Since the ``insertOne()`` may run before the -listener function can register, we use a timer, defined as -``simulateAsyncPause`` to wait 1 second before executing the insert. - -We also use ``simulateAsyncPause`` after the insertion of the document -to provide ample time for the listener function to receive the change -event and for the listener to complete its execution before -closing the ``ChangeStream`` instance using the ``close()`` method. - -The timers used in this example are only necessary for this demonstration -to make sure there is enough time to register the listener and have the -listener process the event before exiting. +the ``insertDB`` database and prints change events as they occur: .. include:: /includes/connect-guide-note.rst @@ -152,20 +124,29 @@ listener process the event before exiting. The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case. -If you run the preceding example, you should see the following output: +When you run this code and then make any changes to the ``haikus`` +collection, such as performing an insert or delete operation, you will +be able to see each change event document in your terminal. + +For example, if you insert a document to the collection, the code prints +the following output: .. code-block:: none :copyable: false - received a change to the collection: { - _id: { _data: '825EC...' }, + { + _id: { + _data: '...' + }, operationType: 'insert', - clusterTime: new Timestamp { ... }, - fullDocument: { _id: new ObjectId(...), title: 'Record of a Shriveled Datum', content: 'No bytes, no problem. Just insert a document, in MongoDB' }, + clusterTime: new Timestamp({ t: 1675800603, i: 31 }), + fullDocument: { + _id: new ObjectId("..."), + ... + }, ns: { db: 'insertDB', coll: 'haikus' }, - documentKey: { _id: new ObjectId(...) } + documentKey: { _id: new ObjectId("...") } } - closed the change stream .. note:: Receive Full Documents From Updates @@ -184,3 +165,62 @@ If you run the preceding example, you should see the following output: const changeStream = collection.watch(pipeline, options); +Listener Function +~~~~~~~~~~~~~~~~~ + +The following example opens a change stream on the ``haikus`` collection in +the ``insertDB`` database. Let's create a listener function to receive and +print change events that occur on the collection. + +First, open the change stream on the collection and then define a listener +on the change stream using the ``on()`` method. Once set, generate a change +event by performing a change to the collection. + +To generate the change event on the collection, let's use ``insertOne()`` +method to add a new document. Since the ``insertOne()`` may run before the +listener function can register, we use a timer, defined as +``simulateAsyncPause`` to wait 1 second before executing the insert. + +We also use ``simulateAsyncPause`` after the insertion of the document +to provide ample time for the listener function to receive the change +event and for the listener to complete its execution before +closing the ``ChangeStream`` instance using the ``close()`` method. + +The timers used in this example are only necessary for this demonstration +to make sure there is enough time to register listener and have the +listener process the event before exiting. + +.. include:: /includes/connect-guide-note.rst + +.. tabs:: + + .. tab:: JavaScript + :tabid: javascript + + .. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js + :language: javascript + :linenos: + + .. tab:: TypeScript + :tabid: typescript + + .. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js + :language: javascript + :linenos: + +.. note:: Identical Code Snippets + + The JavaScript and TypeScript code snippets above are identical. There are no + TypeScript specific features of the driver relevant to this use case. + +Visit the following resources for additional material on the classes and +methods mentioned on this page: + +- :manual:`Change streams ` +- :manual:`Change events ` +- :manual:`Aggregation pipeline ` +- :manual:`Aggregation stages ` +- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__ +- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__, +- `Db.watch() <{+api+}/classes/Db.html#watch>`__, +- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__ \ No newline at end of file From 83204bb5f278a25f43231cb21006ccd5542c0f0c Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Tue, 7 Feb 2023 15:27:48 -0500 Subject: [PATCH 4/7] small fixes --- source/usage-examples/changeStream.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index eda10e757..441952bc6 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -134,6 +134,7 @@ the following output: .. code-block:: none :copyable: false + Received change: { _id: { _data: '...' From b957b5ba22fbfbeac5b67e6b067bf5bbd9a19aca Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Wed, 8 Feb 2023 09:40:13 -0500 Subject: [PATCH 5/7] MW PR fixes 2 --- source/usage-examples/changeStream.txt | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index 441952bc6..362ba5440 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -124,9 +124,9 @@ the ``insertDB`` database and prints change events as they occur: The JavaScript and TypeScript code snippets above are identical. There are no TypeScript specific features of the driver relevant to this use case. -When you run this code and then make any changes to the ``haikus`` +When you run this code and then make a change to the ``haikus`` collection, such as performing an insert or delete operation, you will -be able to see each change event document in your terminal. +be able to see the change event document printed in your terminal. For example, if you insert a document to the collection, the code prints the following output: @@ -174,22 +174,25 @@ the ``insertDB`` database. Let's create a listener function to receive and print change events that occur on the collection. First, open the change stream on the collection and then define a listener -on the change stream using the ``on()`` method. Once set, generate a change -event by performing a change to the collection. +on the change stream using the ``on()`` method. Once you set the +listener, generate a change event by performing a change to the collection. -To generate the change event on the collection, let's use ``insertOne()`` -method to add a new document. Since the ``insertOne()`` may run before the +To generate the change event on the collection, let's use the ``insertOne()`` +method to add a new document. Since ``insertOne()`` may run before the listener function can register, we use a timer, defined as ``simulateAsyncPause`` to wait 1 second before executing the insert. -We also use ``simulateAsyncPause`` after the insertion of the document -to provide ample time for the listener function to receive the change +We also use ``simulateAsyncPause`` after the insertion of the document. +This provides ample time for the listener function to receive the change event and for the listener to complete its execution before closing the ``ChangeStream`` instance using the ``close()`` method. -The timers used in this example are only necessary for this demonstration -to make sure there is enough time to register listener and have the -listener process the event before exiting. +.. note:: Reason to include timers + + The timers used in this example are only for demonstration + purposes. They make sure that there is enough time to register + the listener and have the listener process the change event before + exiting. .. include:: /includes/connect-guide-note.rst From a996c2963033da51f4d069390c4fb09ec8748d8d Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Wed, 8 Feb 2023 09:50:36 -0500 Subject: [PATCH 6/7] small fixes --- source/usage-examples/changeStream.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index 362ba5440..dbebbe6f5 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -125,8 +125,8 @@ the ``insertDB`` database and prints change events as they occur: TypeScript specific features of the driver relevant to this use case. When you run this code and then make a change to the ``haikus`` -collection, such as performing an insert or delete operation, you will -be able to see the change event document printed in your terminal. +collection, such as performing an insert or delete operation, you can +see the change event document printed in your terminal. For example, if you insert a document to the collection, the code prints the following output: From eadfc13c6e2a900a056c66c917470585b3eec37b Mon Sep 17 00:00:00 2001 From: Rea Radhika Rustagi Date: Wed, 8 Feb 2023 13:08:45 -0500 Subject: [PATCH 7/7] remove redundant note --- source/usage-examples/changeStream.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/usage-examples/changeStream.txt b/source/usage-examples/changeStream.txt index dbebbe6f5..7c32a17ff 100644 --- a/source/usage-examples/changeStream.txt +++ b/source/usage-examples/changeStream.txt @@ -194,8 +194,6 @@ closing the ``ChangeStream`` instance using the ``close()`` method. the listener and have the listener process the change event before exiting. -.. include:: /includes/connect-guide-note.rst - .. tabs:: .. tab:: JavaScript