Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions source/code-snippets/usage-examples/changeStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,22 @@ const uri = "<connection string uri>";

const client = new MongoClient(uri);

const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const collection = database.collection("haikus");

// open a Change Stream on the "haikus" collection
// Open a Change Stream on the "haikus" collection
changeStream = collection.watch();

// set up a listener when change events are emitted
changeStream.on("change", next => {
// process any change event
console.log("received a change to the collection: \t", next);
});

await simulateAsyncPause();

await collection.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});

await simulateAsyncPause();
// Print change events
for await (const change of changeStream) {
console.log("Received change:\n", change);
}

await changeStream.close();

console.log("closed the change stream");
} finally {
await client.close();
}
Expand Down
44 changes: 44 additions & 0 deletions source/code-snippets/usage-examples/changeStream_listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string.
const uri = "<connection string uri>";

const client = new MongoClient(uri);

const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const collection = database.collection("haikus");

// open a Change Stream on the "haikus" collection
changeStream = collection.watch();

// set up a listener when change events are emitted
changeStream.on("change", next => {
// process any change event
console.log("received a change to the collection: \t", next);
});

await simulateAsyncPause();

await collection.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});

await simulateAsyncPause();

await changeStream.close();

console.log("closed the change stream");
} finally {
await client.close();
}
}
run().catch(console.dir);
147 changes: 102 additions & 45 deletions source/usage-examples/changeStream.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,29 @@ The ``watch()`` method accepts an ``options`` object as the second parameter. Re
section for more information on the settings you can configure with this object.

The ``watch()`` method returns an instance of a `ChangeStream <{+api+}/classes/ChangeStream.html>`__. You can read events from
change streams by iterating over them or listening for events. Select the tab that corresponds to the way you want to
read events from the change stream below.
change streams by iterating over them or listening for events.

.. include:: /includes/changestream-paradigm-warning.rst
Select the tab that corresponds to the way you want to
read events from the change stream:

.. tabs::

.. tab::
:tabid: Iterative
:tabid: Idiomatic Iteration

Starting in version 4.12, ``ChangeStream`` objects are async
iterables. With this change, you can use ``for-await`` loops to
retrieve events from an open change stream:

.. code-block:: js
:copyable: false

for await (const change of changeStream) {
console.log("Received change: ", change);
}

.. tab::
:tabid: Manual Iteration

You can call methods on the ``ChangeStream`` object such as:

Expand All @@ -67,7 +81,6 @@ read events from the change stream below.
can specify logic in the listener to process the change event document
when it is received.


You can control the change stream by calling ``pause()`` to stop emitting events or ``resume()`` to continue to emit events.

To stop processing change events, call the `close() <{+api+}/classes/ChangeStream.html#close>`__ method on the
Expand All @@ -77,42 +90,16 @@ read events from the change stream below.

changeStream.close();

Visit the following resources for additional material on the classes and
methods presented above:
.. include:: /includes/changestream-paradigm-warning.rst

- :manual:`Change streams </changeStreams/>`
- :manual:`Change events </reference/change-events/>`
- :manual:`Aggregation pipeline </reference/operator/aggregation-pipeline/>`
- :manual:`Aggregation stages </changeStreams/#modify-change-stream-output>`
- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__
- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__,
- `Db.watch() <{+api+}/classes/Db.html#watch>`__,
- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__
Examples
--------

Example
-------
Iteration
~~~~~~~~~

The following example opens a change stream on the ``haikus`` collection in
the ``insertDB`` database. Let's create a listener function to receive and
print change events that occur on the collection.

First, open the change stream on the collection and then define a listener
on the change stream using the ``on()`` method. Once set, generate a change
event by performing a change to the collection.

To generate the change event on the collection, let's use ``insertOne()``
method to add a new document. Since the ``insertOne()`` may run before the
listener function can register, we use a timer, defined as
``simulateAsyncPause`` to wait 1 second before executing the insert.

We also use ``simulateAsyncPause`` after the insertion of the document
to provide ample time for the listener function to receive the change
event and for the listener to complete its execution before
closing the ``ChangeStream`` instance using the ``close()`` method.

The timers used in this example are only necessary for this demonstration
to make sure there is enough time to register listener and have the
listener process the event before exiting.
the ``insertDB`` database and prints change events as they occur:

.. include:: /includes/connect-guide-note.rst

Expand All @@ -137,20 +124,30 @@ listener process the event before exiting.
The JavaScript and TypeScript code snippets above are identical. There are no
TypeScript specific features of the driver relevant to this use case.

If you run the preceding example, you should see the following output:
When you run this code and then make a change to the ``haikus``
collection, such as performing an insert or delete operation, you can
see the change event document printed in your terminal.

.. code-block:: javascript
For example, if you insert a document to the collection, the code prints
the following output:

.. code-block:: none
:copyable: false

received a change to the collection: {
_id: { _data: '825EC...' },
Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp { ... },
fullDocument: { _id: new ObjectId(...), title: 'Record of a Shriveled Datum', content: 'No bytes, no problem. Just insert a document, in MongoDB' },
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId(...) }
documentKey: { _id: new ObjectId("...") }
}
closed the change stream

.. note:: Receive Full Documents From Updates

Expand All @@ -169,3 +166,63 @@ If you run the preceding example, you should see the following output:

const changeStream = collection.watch(pipeline, options);

Listener Function
~~~~~~~~~~~~~~~~~

The following example opens a change stream on the ``haikus`` collection in
the ``insertDB`` database. Let's create a listener function to receive and
print change events that occur on the collection.

First, open the change stream on the collection and then define a listener
on the change stream using the ``on()`` method. Once you set the
listener, generate a change event by performing a change to the collection.

To generate the change event on the collection, let's use the ``insertOne()``
method to add a new document. Since ``insertOne()`` may run before the
listener function can register, we use a timer, defined as
``simulateAsyncPause`` to wait 1 second before executing the insert.

We also use ``simulateAsyncPause`` after the insertion of the document.
This provides ample time for the listener function to receive the change
event and for the listener to complete its execution before
closing the ``ChangeStream`` instance using the ``close()`` method.

.. note:: Reason to include timers

The timers used in this example are only for demonstration
purposes. They make sure that there is enough time to register
the listener and have the listener process the change event before
exiting.

.. tabs::

.. tab:: JavaScript
:tabid: javascript

.. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js
:language: javascript
:linenos:

.. tab:: TypeScript
:tabid: typescript

.. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js
:language: javascript
:linenos:

.. note:: Identical Code Snippets

The JavaScript and TypeScript code snippets above are identical. There are no
TypeScript specific features of the driver relevant to this use case.

Visit the following resources for additional material on the classes and
methods mentioned on this page:

- :manual:`Change streams </changeStreams/>`
- :manual:`Change events </reference/change-events/>`
- :manual:`Aggregation pipeline </reference/operator/aggregation-pipeline/>`
- :manual:`Aggregation stages </changeStreams/#modify-change-stream-output>`
- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__
- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__,
- `Db.watch() <{+api+}/classes/Db.html#watch>`__,
- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__