Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/sdam/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,11 @@ impl Monitor {
let start = Instant::now();
let result = tokio::select! {
result = execute_hello => match result {
Ok(reply) => HelloResult::Ok(reply),
Ok(mut reply) => {
// Do not propagate server reported cluster time for monitoring hello responses.
reply.cluster_time = None;
HelloResult::Ok(reply)
},
Err(e) => HelloResult::Err(e)
},
r = self.request_receiver.wait_for_cancellation() => {
Expand Down
90 changes: 89 additions & 1 deletion src/test/spec/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod sessions_not_supported_skip_local; // requires mongocryptd
use std::{
future::IntoFuture,
sync::{Arc, Mutex},
time::Duration,
};

use futures::TryStreamExt;
Expand All @@ -12,12 +13,18 @@ use futures_util::{future::try_join_all, FutureExt};
use crate::{
bson::{doc, Document},
error::{ErrorKind, Result},
event::command::{CommandEvent, CommandStartedEvent},
event::{
command::{CommandEvent, CommandStartedEvent},
sdam::SdamEvent,
},
test::{
get_client_options,
log_uncaptured,
server_version_gte,
spec::unified_runner::run_unified_tests,
topology_is_load_balanced,
topology_is_sharded,
Event,
},
Client,
};
Expand Down Expand Up @@ -197,3 +204,84 @@ async fn implicit_session_after_connection() {
}

// Prose tests 18 and 19 in sessions_not_supported_skip_local module

// Sessions prose test 20
#[tokio::test]
async fn no_cluster_time_in_sdam() {
if topology_is_load_balanced().await {
log_uncaptured("Skipping no_cluster_time_in_sdam: load-balanced topology");
return;
}
let mut options = get_client_options().await.clone();
options.direct_connection = Some(true);
options.hosts.drain(1..);
let heartbeat_freq = Duration::from_millis(10);
options.heartbeat_freq = Some(heartbeat_freq);
let c1 = Client::for_test()
.options(options)
.min_heartbeat_freq(heartbeat_freq)
.monitor_events()
.await;

// Send a ping on c1
let cluster_time = c1
.database("admin")
.run_command(doc! { "ping": 1 })
.await
.unwrap()
.get("$clusterTime")
.cloned();

// Send a write on c2
let c2 = Client::for_test().await;
c2.database("test")
.collection::<Document>("test")
.insert_one(doc! {"advance": "$clusterTime"})
.await
.unwrap();

// Wait for the next (heartbeat started, heartbeat succeeded) event pair on c1
let mut events = c1.events.stream();
const TIMEOUT: Duration = Duration::from_secs(1);
crate::runtime::timeout(TIMEOUT, async {
loop {
// Find a started event...
let _started = events
.next_match(TIMEOUT, |ev| {
matches!(ev, Event::Sdam(SdamEvent::ServerHeartbeatStarted(_)))
})
.await
.unwrap();
// ... and the next heartbeat event after that ...
let next_hb = events
.next_map(TIMEOUT, |ev| match ev {
Event::Sdam(hb @ SdamEvent::ServerHeartbeatStarted(_)) => Some(hb),
Event::Sdam(hb @ SdamEvent::ServerHeartbeatFailed(_)) => Some(hb),
Event::Sdam(hb @ SdamEvent::ServerHeartbeatSucceeded(_)) => Some(hb),
_ => None,
})
.await
.unwrap();
// ... and see if it was a succeeded event.
if matches!(next_hb, SdamEvent::ServerHeartbeatSucceeded(_)) {
break;
}
}
})
.await
.unwrap();

// Send another ping
let mut events = c1.events.stream();
c1.database("admin")
.run_command(doc! { "ping": 1 })
.await
.unwrap();
let (start, _succeded) = events
.next_successful_command_execution(TIMEOUT, "ping")
.await
.unwrap();

// Assert that the cluster time hasn't changed
assert_eq!(cluster_time.as_ref(), start.command.get("$clusterTime"));
}