Skip to content

RUST-663 Support $merge and $out executing on secondaries #1360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
error::{Error, ErrorKind, Result},
event::command::CommandEvent,
id_set::IdSet,
operation::OverrideCriteriaFn,
options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
sdam::{
server_selection::{self, attempt_to_select_server},
Expand Down Expand Up @@ -446,8 +447,8 @@ impl Client {
&self,
criteria: Option<&SelectionCriteria>,
) -> Result<ServerAddress> {
let server = self
.select_server(criteria, "Test select server", None)
let (server, _) = self
.select_server(criteria, "Test select server", None, |_, _| None)
.await?;
Ok(server.address.clone())
}
Expand All @@ -460,7 +461,8 @@ impl Client {
#[allow(unused_variables)] // we only use the operation_name for tracing.
operation_name: &str,
deprioritized: Option<&ServerAddress>,
) -> Result<SelectedServer> {
override_criteria: OverrideCriteriaFn,
) -> Result<(SelectedServer, SelectionCriteria)> {
let criteria =
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));

Expand Down Expand Up @@ -488,9 +490,16 @@ impl Client {
let mut watcher = self.inner.topology.watch();
loop {
let state = watcher.observe_latest();

let override_slot;
let effective_criteria =
if let Some(oc) = override_criteria(criteria, &state.description) {
override_slot = oc;
&override_slot
} else {
criteria
};
let result = server_selection::attempt_to_select_server(
criteria,
effective_criteria,
&state.description,
&state.servers(),
deprioritized,
Expand All @@ -507,7 +516,7 @@ impl Client {
#[cfg(feature = "tracing-unstable")]
event_emitter.emit_succeeded_event(&state.description, &server);

return Ok(server);
return Ok((server, effective_criteria.clone()));
} else {
#[cfg(feature = "tracing-unstable")]
if !emitted_waiting_message {
Expand Down
264 changes: 142 additions & 122 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::{
Retryability,
},
options::{ChangeStreamOptions, SelectionCriteria},
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
sdam::{HandshakePhase, ServerType, TopologyType, TransactionSupportStatus},
selection_criteria::ReadPreference,
tracking_arc::TrackingArc,
ClusterTime,
Expand Down Expand Up @@ -318,15 +318,16 @@ impl Client {
.and_then(|s| s.transaction.pinned_mongos())
.or_else(|| op.selection_criteria());

let server = match self
let (server, effective_criteria) = match self
.select_server(
selection_criteria,
op.name(),
retry.as_ref().map(|r| &r.first_server),
op.override_criteria(),
)
.await
{
Ok(server) => server,
Ok(out) => out,
Err(mut err) => {
retry.first_error()?;

Expand Down Expand Up @@ -398,6 +399,7 @@ impl Client {
&mut session,
txn_number,
retryability,
effective_criteria,
)
.await
{
Expand Down Expand Up @@ -471,127 +473,21 @@ impl Client {
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
retryability: Retryability,
effective_criteria: SelectionCriteria,
) -> Result<T::O> {
loop {
let stream_description = connection.stream_description()?;
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
let mut cmd = op.build(stream_description)?;
self.inner.topology.update_command_with_read_pref(
connection.address(),
&mut cmd,
op.selection_criteria(),
);

match session {
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
cmd.set_session(session);
if let Some(txn_number) = txn_number {
cmd.set_txn_number(txn_number);
}
if session
.options()
.and_then(|opts| opts.snapshot)
.unwrap_or(false)
{
if connection
.stream_description()?
.max_wire_version
.unwrap_or(0)
< 13
{
let labels: Option<Vec<_>> = None;
return Err(Error::new(
ErrorKind::IncompatibleServer {
message: "Snapshot reads require MongoDB 5.0 or later".into(),
},
labels,
));
}
cmd.set_snapshot_read_concern(session);
}
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
// Causal consistency defaults to true, unless snapshot is true.
else if session.causal_consistency()
&& matches!(
session.transaction.state,
TransactionState::None | TransactionState::Starting
)
&& op.supports_read_concern(stream_description)
{
cmd.set_after_cluster_time(session);
}

match session.transaction.state {
TransactionState::Starting => {
cmd.set_start_transaction();
cmd.set_autocommit();
if session.causal_consistency() {
cmd.set_after_cluster_time(session);
}

if let Some(ref options) = session.transaction.options {
if let Some(ref read_concern) = options.read_concern {
cmd.set_read_concern_level(read_concern.level.clone());
}
}
if self.is_load_balanced() {
session.pin_connection(connection.pin()?);
} else if is_sharded {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress => cmd.set_autocommit(),
TransactionState::Committed { .. } | TransactionState::Aborted => {
cmd.set_autocommit();

// Append the recovery token to the command if we are committing or
// aborting on a sharded transaction.
if is_sharded {
if let Some(ref recovery_token) = session.transaction.recovery_token
{
cmd.set_recovery_token(recovery_token);
}
}
}
_ => {}
}
session.update_last_use();
}
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
return Err(ErrorKind::InvalidArgument {
message: format!("{} does not support sessions", cmd.name),
}
.into());
}
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
return Err(ErrorKind::InvalidArgument {
message: "Cannot use ClientSessions with unacknowledged write concern"
.to_string(),
}
.into());
}
_ => {}
}

let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
let client_cluster_time = self.inner.topology.cluster_time();
let max_cluster_time =
std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
if let Some(cluster_time) = max_cluster_time {
cmd.set_cluster_time(cluster_time);
}
let cmd = self.build_command(
op,
connection,
session,
txn_number,
effective_criteria.clone(),
)?;

let connection_info = connection.info();
let service_id = connection.service_id();
let request_id = next_request_id();

if let Some(ref server_api) = self.inner.options.server_api {
cmd.set_server_api(server_api);
}

let should_redact = cmd.should_redact();

let cmd_name = cmd.name.clone();
let target_db = cmd.target_db.clone();

Expand Down Expand Up @@ -630,8 +526,9 @@ impl Client {
let start_time = Instant::now();
let command_result = match connection.send_message(message).await {
Ok(response) => {
self.handle_response(op, session, is_sharded, response)
.await
let is_sharded =
connection.stream_description()?.initial_server_type == ServerType::Mongos;
self.parse_response(op, session, is_sharded, response).await
}
Err(err) => Err(err),
};
Expand Down Expand Up @@ -706,6 +603,7 @@ impl Client {
let context = ExecutionContext {
connection,
session: session.as_deref_mut(),
effective_criteria: effective_criteria.clone(),
};

match op.handle_response(response, context).await {
Expand Down Expand Up @@ -737,6 +635,128 @@ impl Client {
}
}

fn build_command<T: Operation>(
&self,
op: &mut T,
connection: &mut PooledConnection,
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
effective_criteria: SelectionCriteria,
) -> Result<crate::cmap::Command> {
let stream_description = connection.stream_description()?;
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
let mut cmd = op.build(stream_description)?;
self.inner.topology.update_command_with_read_pref(
connection.address(),
&mut cmd,
&effective_criteria,
);

match session {
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
cmd.set_session(session);
if let Some(txn_number) = txn_number {
cmd.set_txn_number(txn_number);
}
if session
.options()
.and_then(|opts| opts.snapshot)
.unwrap_or(false)
{
if connection
.stream_description()?
.max_wire_version
.unwrap_or(0)
< 13
{
let labels: Option<Vec<_>> = None;
return Err(Error::new(
ErrorKind::IncompatibleServer {
message: "Snapshot reads require MongoDB 5.0 or later".into(),
},
labels,
));
}
cmd.set_snapshot_read_concern(session);
}
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
// Causal consistency defaults to true, unless snapshot is true.
else if session.causal_consistency()
&& matches!(
session.transaction.state,
TransactionState::None | TransactionState::Starting
)
&& op.supports_read_concern(stream_description)
{
cmd.set_after_cluster_time(session);
}

match session.transaction.state {
TransactionState::Starting => {
cmd.set_start_transaction();
cmd.set_autocommit();
if session.causal_consistency() {
cmd.set_after_cluster_time(session);
}

if let Some(ref options) = session.transaction.options {
if let Some(ref read_concern) = options.read_concern {
cmd.set_read_concern_level(read_concern.level.clone());
}
}
if self.is_load_balanced() {
session.pin_connection(connection.pin()?);
} else if is_sharded {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress => cmd.set_autocommit(),
TransactionState::Committed { .. } | TransactionState::Aborted => {
cmd.set_autocommit();

// Append the recovery token to the command if we are committing or aborting
// on a sharded transaction.
if is_sharded {
if let Some(ref recovery_token) = session.transaction.recovery_token {
cmd.set_recovery_token(recovery_token);
}
}
}
_ => {}
}
session.update_last_use();
}
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
return Err(ErrorKind::InvalidArgument {
message: format!("{} does not support sessions", cmd.name),
}
.into());
}
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
return Err(ErrorKind::InvalidArgument {
message: "Cannot use ClientSessions with unacknowledged write concern"
.to_string(),
}
.into());
}
_ => {}
}

let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
let client_cluster_time = self.inner.topology.cluster_time();
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
if let Some(cluster_time) = max_cluster_time {
cmd.set_cluster_time(cluster_time);
}

if let Some(ref server_api) = self.inner.options.server_api {
cmd.set_server_api(server_api);
}

Ok(cmd)
}

#[cfg(feature = "in-use-encryption")]
fn auto_encrypt<'a>(
&'a self,
Expand Down Expand Up @@ -789,7 +809,7 @@ impl Client {
.await
}

async fn handle_response<T: Operation>(
async fn parse_response<T: Operation>(
&self,
op: &T,
session: &mut Option<&mut ClientSession>,
Expand Down Expand Up @@ -864,8 +884,8 @@ impl Client {
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
|| server_type.is_data_bearing()
}));
let _: SelectedServer = self
.select_server(Some(&criteria), operation_name, None)
let _ = self
.select_server(Some(&criteria), operation_name, None, |_, _| None)
.await?;
Ok(())
}
Expand Down
Loading