diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index 9a530cf1d..3a1627a57 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -275,7 +275,11 @@ impl Monitor { let start = Instant::now(); let result = tokio::select! { result = execute_hello => match result { - Ok(reply) => HelloResult::Ok(reply), + Ok(mut reply) => { + // Do not propagate server reported cluster time for monitoring hello responses. + reply.cluster_time = None; + HelloResult::Ok(reply) + }, Err(e) => HelloResult::Err(e) }, r = self.request_receiver.wait_for_cancellation() => { diff --git a/src/test/spec/sessions.rs b/src/test/spec/sessions.rs index 798d13a99..5e2c8d3ed 100644 --- a/src/test/spec/sessions.rs +++ b/src/test/spec/sessions.rs @@ -4,6 +4,7 @@ mod sessions_not_supported_skip_local; // requires mongocryptd use std::{ future::IntoFuture, sync::{Arc, Mutex}, + time::Duration, }; use futures::TryStreamExt; @@ -12,12 +13,18 @@ use futures_util::{future::try_join_all, FutureExt}; use crate::{ bson::{doc, Document}, error::{ErrorKind, Result}, - event::command::{CommandEvent, CommandStartedEvent}, + event::{ + command::{CommandEvent, CommandStartedEvent}, + sdam::SdamEvent, + }, test::{ get_client_options, + log_uncaptured, server_version_gte, spec::unified_runner::run_unified_tests, + topology_is_load_balanced, topology_is_sharded, + Event, }, Client, }; @@ -197,3 +204,84 @@ async fn implicit_session_after_connection() { } // Prose tests 18 and 19 in sessions_not_supported_skip_local module + +// Sessions prose test 20 +#[tokio::test] +async fn no_cluster_time_in_sdam() { + if topology_is_load_balanced().await { + log_uncaptured("Skipping no_cluster_time_in_sdam: load-balanced topology"); + return; + } + let mut options = get_client_options().await.clone(); + options.direct_connection = Some(true); + options.hosts.drain(1..); + let heartbeat_freq = Duration::from_millis(10); + options.heartbeat_freq = Some(heartbeat_freq); + let c1 = Client::for_test() + .options(options) + .min_heartbeat_freq(heartbeat_freq) + .monitor_events() + .await; + + // Send a ping on c1 + let cluster_time = c1 + .database("admin") + .run_command(doc! { "ping": 1 }) + .await + .unwrap() + .get("$clusterTime") + .cloned(); + + // Send a write on c2 + let c2 = Client::for_test().await; + c2.database("test") + .collection::("test") + .insert_one(doc! {"advance": "$clusterTime"}) + .await + .unwrap(); + + // Wait for the next (heartbeat started, heartbeat succeeded) event pair on c1 + let mut events = c1.events.stream(); + const TIMEOUT: Duration = Duration::from_secs(1); + crate::runtime::timeout(TIMEOUT, async { + loop { + // Find a started event... + let _started = events + .next_match(TIMEOUT, |ev| { + matches!(ev, Event::Sdam(SdamEvent::ServerHeartbeatStarted(_))) + }) + .await + .unwrap(); + // ... and the next heartbeat event after that ... + let next_hb = events + .next_map(TIMEOUT, |ev| match ev { + Event::Sdam(hb @ SdamEvent::ServerHeartbeatStarted(_)) => Some(hb), + Event::Sdam(hb @ SdamEvent::ServerHeartbeatFailed(_)) => Some(hb), + Event::Sdam(hb @ SdamEvent::ServerHeartbeatSucceeded(_)) => Some(hb), + _ => None, + }) + .await + .unwrap(); + // ... and see if it was a succeeded event. + if matches!(next_hb, SdamEvent::ServerHeartbeatSucceeded(_)) { + break; + } + } + }) + .await + .unwrap(); + + // Send another ping + let mut events = c1.events.stream(); + c1.database("admin") + .run_command(doc! { "ping": 1 }) + .await + .unwrap(); + let (start, _succeded) = events + .next_successful_command_execution(TIMEOUT, "ping") + .await + .unwrap(); + + // Assert that the cluster time hasn't changed + assert_eq!(cluster_time.as_ref(), start.command.get("$clusterTime")); +}