diff --git a/src/change_stream/event.rs b/src/change_stream/event.rs index 9a154cef7..2feee4fdd 100644 --- a/src/change_stream/event.rs +++ b/src/change_stream/event.rs @@ -6,7 +6,7 @@ use crate::{cursor::CursorSpecification, options::ChangeStreamOptions}; #[cfg(test)] use bson::Bson; -use bson::{Document, RawBson, RawDocumentBuf, Timestamp}; +use bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp}; use serde::{Deserialize, Serialize}; /// An opaque token used for resuming an interrupted @@ -87,6 +87,9 @@ pub struct ChangeStreamEvent { /// The cluster time at which the change occurred. pub cluster_time: Option, + /// The wall time from the mongod that the change event originated from. + pub wall_time: Option, + /// The `Document` created or modified by the `insert`, `replace`, `delete`, `update` /// operations (i.e. CRUD operations). /// diff --git a/src/test/spec/json/change-streams/unified/change-streams.json b/src/test/spec/json/change-streams/unified/change-streams.json index dfc86a913..4faf2c8e0 100644 --- a/src/test/spec/json/change-streams/unified/change-streams.json +++ b/src/test/spec/json/change-streams/unified/change-streams.json @@ -1428,6 +1428,48 @@ ] } ] + }, + { + "description": "Test wallTime field is set in a change event", + "runOnRequirements": [ + { + "minServerVersion": "6.0.0" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "wallTime": { + "$$exists": true + } + } + } + ] } ] } diff --git a/src/test/spec/json/change-streams/unified/change-streams.yml b/src/test/spec/json/change-streams/unified/change-streams.yml index 629fbe1bf..afaa77ca7 100644 --- a/src/test/spec/json/change-streams/unified/change-streams.yml +++ b/src/test/spec/json/change-streams/unified/change-streams.yml @@ -737,3 +737,24 @@ tests: pipeline: [ { $changeStream: {} } ] commandName: aggregate databaseName: *database0 + + - description: "Test wallTime field is set in a change event" + runOnRequirements: + - minServerVersion: "6.0.0" + operations: + - name: createChangeStream + object: *collection0 + arguments: { pipeline: [] } + saveResultAsEntity: &changeStream0 changeStream0 + - name: insertOne + object: *collection0 + arguments: + document: { "_id": 1, "a": 1 } + - name: iterateUntilDocumentOrError + object: *changeStream0 + expectResult: + operationType: "insert" + ns: + db: *database0 + coll: *collection0 + wallTime: { $$exists: true }