From d563f8d3214336e040f2a1cbf08f2d7720891179 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 17 Nov 2025 00:25:52 +0000 Subject: [PATCH 1/7] TQ: Support proxying Nexus-related API requests When committing configurations, the trust quorum protocol relies on Nexus to be up and for Nexus to interact with each trust quorum node. This allows Nexus to continuously try to commit nodes in an RPW and record in the database which nodes have acked. This mirrors many of our existing designs where Nexus observes and records information about successful operations and retries via RPW. Specifically it matches how we do things in the `Reconfigurator` and the `TUF Repo Depot`. Unfortunately, Nexus cannot communicate with new sleds that are not yet running a sled-agent, but still stuck in the bootstrap-agent. This is because the bootstrap agents (and trust quorum protocol) only communicate over the bootstrap network, which Nexus does not have access to. Nodes must already be part of an existing configuration, running sled-agent, and on the underlay network to talk to Nexus. In this common case, Nexus sends trust quorum related messages to the sled-agent which then calls the api of its local trust quorum `NodeTask`. This is not possible for newly added sleds. While the trust quroum coordinator node will tell new nodes to `Prepare` a configuration over the bootstrap networtk, these new nodes do not have any mechanism to receive commits from Nexus. Therefore we must proxy these commit related operations to an existing member of the trust quorum when adding a new node. We also added the ability to proxy `NodeStatus` requests to aid in debugging. This PR therefore adds the ability to proxy certain requests from one node to another so that we can commit nodes to the latest trust quorum configuration, setup their encrypted storage, and boot their sled-agent. It's worth noting that this is not the only way we could have solved this problem. There are a few possibilities in the design space. 1. We could have had the coordinator always send commit operations and collect acknowledgements as during the `Prepare` phase. Unfortunately, if the coordinator dies before all nodes ack then Nexus would not be able to ensure commit at all nodes. To make this reliable, Nexus would still need to be able to reach out to uncommitted nodes and tell them to commit. Since we already have to do the latter there is no reason to do the former. 2. We could commit at the coordinator (or a few nodes), and then have them gossip around information about commit. This is actually a promising design, and is essentially what we do for the early network config. Nexus could then wait for the sled-agent to start for those nodes and ask them directly if they committed. This would still require talking to all nodes and it adds some extra complexity, but it still seems somewhat reasonable. The rationale for our current choice of proxying was largely one of fitting our existing patterns. It's also very useful for Nexus to be able directly ask a trust quorum node on another sled about its status to diagnose issues. So we went with the proxy mechanism as implemented here. Well, why did we introduce another level of messages at the `Task` layer instead of re-using the `CommitAdvance` functionality or adding new variants to the `PeerMsg` in the `trust_quorum_protocol` crate? The rationale here is largely that the trust quorum protocol as written in RFD 238 and specified in TLA+ doesn't include this behavior. It expects commits from the `Node` "API", meaning from `Nexus`. I didn't want to change that behavior unnecessarily due to urgency, and an existing solid design. It was also easier to build proxy operations this way since tracking operations in async code with oneshot channels is easier than trying to insert similar tracking into the `sans-io` code. In short, we left the `trust-quorum-protocol` crate alone, and added some async helpers to the `trust_quorum` crate. One additional change was made in this PR. While adding the `tq_proxy` test I noticed that we were unnecessarily using `wait_for_condition` on initial commits, after we knew about succesful prepares. These commits should always complete immediately and so I simplified this code in a few existing tests. --- Cargo.lock | 1 + trust-quorum/Cargo.toml | 1 + trust-quorum/protocol/src/node.rs | 24 +- trust-quorum/protocol/src/validators.rs | 5 +- trust-quorum/src/connection_manager.rs | 76 ++- trust-quorum/src/established_conn.rs | 30 ++ trust-quorum/src/ledgers.rs | 2 +- trust-quorum/src/lib.rs | 3 +- trust-quorum/src/proxy.rs | 589 ++++++++++++++++++++++++ trust-quorum/src/task.rs | 482 ++++++++++++++----- 10 files changed, 1076 insertions(+), 137 deletions(-) create mode 100644 trust-quorum/src/proxy.rs diff --git a/Cargo.lock b/Cargo.lock index e2e8be3c120..16c9c9ce22e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14842,6 +14842,7 @@ dependencies = [ "chacha20poly1305", "ciborium", "daft", + "debug-ignore", "derive_more 0.99.20", "dropshot", "futures", diff --git a/trust-quorum/Cargo.toml b/trust-quorum/Cargo.toml index a778dbe6daa..5a7f6e24dc1 100644 --- a/trust-quorum/Cargo.toml +++ b/trust-quorum/Cargo.toml @@ -16,6 +16,7 @@ camino.workspace = true chacha20poly1305.workspace = true ciborium.workspace = true daft.workspace = true +debug-ignore.workspace = true derive_more.workspace = true futures.workspace = true gfss.workspace = true diff --git a/trust-quorum/protocol/src/node.rs b/trust-quorum/protocol/src/node.rs index 0571b58e35c..dac315bf25f 100644 --- a/trust-quorum/protocol/src/node.rs +++ b/trust-quorum/protocol/src/node.rs @@ -32,7 +32,9 @@ use crate::{ use daft::{Diffable, Leaf}; use gfss::shamir::Share; use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; use slog::{Logger, error, info, o, warn}; +use slog_error_chain::SlogInlineError; /// An entity capable of participating in trust quorum /// @@ -1063,7 +1065,16 @@ impl Node { } } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] pub enum CommitError { #[error("invalid rack id")] InvalidRackId( @@ -1077,7 +1088,16 @@ pub enum CommitError { Expunged { epoch: Epoch, from: BaseboardId }, } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] pub enum PrepareAndCommitError { #[error("invalid rack id")] InvalidRackId( diff --git a/trust-quorum/protocol/src/validators.rs b/trust-quorum/protocol/src/validators.rs index 9301e1cff01..ccc8049f82b 100644 --- a/trust-quorum/protocol/src/validators.rs +++ b/trust-quorum/protocol/src/validators.rs @@ -12,6 +12,7 @@ use crate::{ }; use daft::{BTreeSetDiff, Diffable, Leaf}; use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; use slog::{Logger, error, info, warn}; use std::collections::BTreeSet; @@ -57,7 +58,9 @@ pub struct SledExpungedError { last_prepared_epoch: Option, } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, +)] #[error("mismatched rack id: expected {expected:?}, got {got:?}")] pub struct MismatchedRackIdError { pub expected: RackUuid, diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index 48803e6f794..de1e5f4fd04 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -5,6 +5,7 @@ //! A mechanism for maintaining a full mesh of trust quorum node connections use crate::established_conn::EstablishedConn; +use crate::proxy; use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; // TODO: Move to this crate @@ -12,6 +13,7 @@ use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; use bootstore::schemes::v0::NetworkConfig; use camino::Utf8PathBuf; +use derive_more::From; use iddqd::{ BiHashItem, BiHashMap, TriHashItem, TriHashMap, bi_upcast, tri_upcast, }; @@ -60,7 +62,7 @@ pub enum MainToConnMsg { /// /// All `WireMsg`s sent between nodes is prefixed with a 4 byte size header used /// for framing. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, From)] pub enum WireMsg { /// Used for connection keep alive Ping, @@ -79,6 +81,12 @@ pub enum WireMsg { /// of tiny information layered on top of trust quorum. You can still think /// of it as a bootstore, although, we no longer use that name. NetworkConfig(NetworkConfig), + + /// Requests proxied to other nodes + ProxyRequest(proxy::WireRequest), + + /// Responses to proxy requests + ProxyResponse(proxy::WireResponse), } /// Messages sent from connection managing tasks to the main peer task @@ -99,6 +107,8 @@ pub enum ConnToMainMsgInner { Received { from: BaseboardId, msg: PeerMsg }, ReceivedNetworkConfig { from: BaseboardId, config: NetworkConfig }, Disconnected { peer_id: BaseboardId }, + ProxyRequestReceived { from: BaseboardId, req: proxy::WireRequest }, + ProxyResponseReceived { from: BaseboardId, rsp: proxy::WireResponse }, } pub struct TaskHandle { @@ -120,15 +130,11 @@ impl TaskHandle { self.abort_handle.abort() } - pub async fn send(&self, msg: PeerMsg) { - let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await; - } - - pub async fn send_network_config(&self, config: NetworkConfig) { - let _ = self - .tx - .send(MainToConnMsg::Msg(WireMsg::NetworkConfig(config))) - .await; + pub async fn send(&self, msg: T) + where + T: Into, + { + let _ = self.tx.send(MainToConnMsg::Msg(msg.into())).await; } } @@ -172,7 +178,10 @@ impl EstablishedTaskHandle { self.task_handle.abort(); } - pub async fn send(&self, msg: PeerMsg) { + pub async fn send(&self, msg: T) + where + T: Into, + { let _ = self.task_handle.send(msg).await; } } @@ -235,6 +244,12 @@ pub struct ConnMgrStatus { pub total_tasks_spawned: u64, } +/// The state of a proxy connection +pub enum ProxyConnState { + Connected, + Disconnected, +} + /// A structure to manage all sprockets connections to peer nodes /// /// Each sprockets connection runs in its own task which communicates with the @@ -399,7 +414,7 @@ impl ConnMgr { "peer_id" => %h.baseboard_id, "generation" => network_config.generation ); - h.task_handle.send_network_config(network_config.clone()).await; + h.send(network_config.clone()).await; } } @@ -415,7 +430,42 @@ impl ConnMgr { "peer_id" => %h.baseboard_id, "generation" => network_config.generation ); - h.task_handle.send_network_config(network_config.clone()).await; + h.send(network_config.clone()).await; + } + } + + /// Forward an API request to another node + /// + /// Return the state of the connection at this point in time so that the + /// [`proxy::Tracker`] can manage the outstanding request on behalf of the + /// user. + pub async fn proxy_request( + &mut self, + destination: &BaseboardId, + req: proxy::WireRequest, + ) -> ProxyConnState { + if let Some(h) = self.established.get1(destination) { + info!(self.log, "Sending {req:?}"; "peer_id" => %destination); + h.send(req).await; + ProxyConnState::Connected + } else { + ProxyConnState::Disconnected + } + } + + /// Return a response to a proxied request to another node + /// + /// There is no need to track whether this succeeds or fails. If the + /// connection goes away the client on the other side will notice it and + /// retry if needed. + pub async fn proxy_response( + &mut self, + destination: &BaseboardId, + rsp: proxy::WireResponse, + ) { + if let Some(h) = self.established.get1(destination) { + info!(self.log, "Sending {rsp:?}"; "peer_id" => %destination); + h.send(rsp).await; } } diff --git a/trust-quorum/src/established_conn.rs b/trust-quorum/src/established_conn.rs index b75b0576949..a357ecc6d78 100644 --- a/trust-quorum/src/established_conn.rs +++ b/trust-quorum/src/established_conn.rs @@ -233,6 +233,36 @@ impl EstablishedConn { panic!("Connection to main task channnel full"); } } + WireMsg::ProxyRequest(req) => { + if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + task_id: self.task_id, + msg: ConnToMainMsgInner::ProxyRequestReceived { + from: self.peer_id.clone(), + req, + }, + }) { + error!( + self.log, + "Failed to send received proxy msg to the main task" + ); + panic!("Connection to main task channel full"); + } + } + WireMsg::ProxyResponse(rsp) => { + if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + task_id: self.task_id, + msg: ConnToMainMsgInner::ProxyResponseReceived { + from: self.peer_id.clone(), + rsp, + }, + }) { + error!( + self.log, + "Failed to send received proxy msg to the main task" + ); + panic!("Connection to main task channel full"); + } + } } } } diff --git a/trust-quorum/src/ledgers.rs b/trust-quorum/src/ledgers.rs index b8830141d59..1929e5799e5 100644 --- a/trust-quorum/src/ledgers.rs +++ b/trust-quorum/src/ledgers.rs @@ -17,7 +17,7 @@ use slog::{Logger, info}; use trust_quorum_protocol::PersistentState; /// A wrapper type around [`PersistentState`] for use as a [`Ledger`] -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentStateLedger { pub generation: u64, pub state: PersistentState, diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index dec9b3608ed..5220ba3f797 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -7,9 +7,10 @@ mod connection_manager; pub(crate) mod established_conn; mod ledgers; +mod proxy; mod task; pub(crate) use connection_manager::{ ConnToMainMsg, ConnToMainMsgInner, MainToConnMsg, WireMsg, }; -pub use task::NodeTask; +pub use task::{CommitStatus, Config, NodeApiError, NodeTask, NodeTaskHandle}; diff --git a/trust-quorum/src/proxy.rs b/trust-quorum/src/proxy.rs new file mode 100644 index 00000000000..c8536e21ce7 --- /dev/null +++ b/trust-quorum/src/proxy.rs @@ -0,0 +1,589 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A mechanism for proxying API requests from one trust quorum node to another +//! over sprockets. +//! +//! This is necessary in the case when there is no sled-agent with which Nexus +//! can directly communicate on the underlay network. Since Nexus is not on the +//! bootstrap network it cannot talk directly to trust quorum nodes. +//! +//! This proxy mechanism is also useful during RSS and for general debugging +//! purposes. + +use crate::{ + CommitStatus, + task::{NodeApiRequest, NodeStatus}, +}; +use debug_ignore::DebugIgnore; +use derive_more::From; +use iddqd::{IdHashItem, IdHashMap, id_upcast}; +use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; +use slog_error_chain::{InlineErrorChain, SlogInlineError}; +use tokio::sync::{mpsc, oneshot}; +use trust_quorum_protocol::{ + BaseboardId, CommitError, Configuration, Epoch, PrepareAndCommitError, +}; +use uuid::Uuid; + +/// Requests that can be proxied to another node. Proxied requests should not be +/// proxied again once receieved. The receiving node should instead immediately +/// respond to the request to limit the time spent in the event loop, like a +/// normal NodeApiRequest. +#[derive(Debug, Serialize, Deserialize)] +pub struct WireRequest { + pub request_id: Uuid, + pub op: WireOp, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum WireOp { + Commit { rack_id: RackUuid, epoch: Epoch }, + PrepareAndCommit { config: Configuration }, + Status, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WireResponse { + pub request_id: Uuid, + pub result: Result, +} + +/// The successful variant of a [`WireResponse`] +#[derive(Debug, Serialize, Deserialize, From)] +pub enum WireValue { + /// The successful response value for a `commit` or `prepare_and_commit` + /// operation + Commit(CommitStatus), + /// The successful response value for a `status` operation + Status(NodeStatus), +} + +/// The error variant of a [`WireResponse`] +#[derive( + Debug, PartialEq, Eq, Serialize, Deserialize, From, thiserror::Error, +)] +pub enum WireError { + #[error(transparent)] + Commit(CommitError), + #[error(transparent)] + PrepareAndCommit(PrepareAndCommitError), + #[error(transparent)] + NoInnerError(NoInnerError), +} + +/// Define an Error for cases where the remote task doesn't return an error +/// +/// This is roughly analagous to `Infallible`, but derives `Error`, `Serialize`, +/// and `Deserialize`. +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] +#[error("inner error when none expected")] +pub struct NoInnerError; + +/// A mechanism for proxying API requests to another node +pub struct Proxy { + // A mechanism to send a `WireRequest` to our local task for proxying. + tx: mpsc::Sender, +} + +impl Proxy { + pub fn new(tx: mpsc::Sender) -> Proxy { + Proxy { tx } + } + + pub async fn commit( + &self, + destination: BaseboardId, + rack_id: RackUuid, + epoch: Epoch, + ) -> Result> { + let op = WireOp::Commit { rack_id, epoch }; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Commit(cs) => Ok(cs), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::Commit(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + + self.send(destination, op, destructure_fn).await + } + + pub async fn prepare_and_commit( + &self, + destination: BaseboardId, + config: Configuration, + ) -> Result> { + let op = WireOp::PrepareAndCommit { config }; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Commit(cs) => Ok(cs), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::PrepareAndCommit(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + self.send(destination, op, destructure_fn).await + } + + pub async fn status( + &self, + destination: BaseboardId, + ) -> Result> { + let op = WireOp::Status; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Status(status) => Ok(status), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::NoInnerError(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + self.send(destination, op, destructure_fn).await + } + + /// Send a `WireRequest` to a task and destructure its response to the + /// appropriate type + async fn send( + &self, + destination: BaseboardId, + op: WireOp, + f: F, + ) -> Result> + where + E: std::error::Error, + F: FnOnce(Result) -> Result>, + { + let request_id = Uuid::new_v4(); + let wire_request = WireRequest { request_id, op }; + + // A wrapper for responses from the task + let (task_tx, task_rx) = oneshot::channel(); + + // The message to send to the task + let api_request = + NodeApiRequest::Proxy { destination, wire_request, tx: task_tx }; + + if let Err(e) = self.tx.try_send(api_request) { + match e { + mpsc::error::TrySendError::Full(_) => { + return Err(ProxyError::Busy); + } + mpsc::error::TrySendError::Closed(_) => { + return Err(ProxyError::RecvError); + } + } + } + + let res = task_rx.await.map_err(|_| ProxyError::RecvError)?; + + let res = match res { + Err(TrackerError::Disconnected) => { + return Err(ProxyError::Disconnected); + } + Err(TrackerError::Wire(err)) => Err(err), + Ok(val) => Ok(val), + }; + + f(res) + } +} + +/// The error result of a proxy request +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, SlogInlineError)] +pub enum ProxyError { + #[error(transparent)] + Inner(#[from] T), + #[error("disconnected")] + Disconnected, + #[error("response for different type received: {0}")] + InvalidResponse(String), + #[error("task sender dropped")] + RecvError, + #[error("task is busy: channel full")] + Busy, +} + +/// An error returned from a [`Tracker`]. +/// +/// This error wraps a `WireError` so that it can also indicate when no response +/// was received, such as when the sprockets channel was disconnected. +pub enum TrackerError { + Wire(WireError), + Disconnected, +} + +/// A trackable in-flight proxy request, owned by the `Tracker` +#[derive(Debug)] +pub struct TrackableRequest { + request_id: Uuid, + destination: BaseboardId, + // The option exists so we can take the sender out in `on_disconnect`, when + // the request is borrowed, but about to be discarded. + tx: DebugIgnore>>>, +} + +impl TrackableRequest { + pub fn new( + destination: BaseboardId, + request_id: Uuid, + tx: oneshot::Sender>, + ) -> TrackableRequest { + TrackableRequest { request_id, destination, tx: DebugIgnore(Some(tx)) } + } +} + +impl IdHashItem for TrackableRequest { + type Key<'a> = &'a Uuid; + + fn key(&self) -> Self::Key<'_> { + &self.request_id + } + + id_upcast!(); +} + +/// A mechanism to keep track of proxied requests and wait for their replies +pub struct Tracker { + // In flight operations + ops: IdHashMap, +} + +impl Tracker { + pub fn new() -> Tracker { + Tracker { ops: IdHashMap::new() } + } + + /// The number of proxied requests outstanding + pub fn len(&self) -> usize { + self.ops.len() + } + + pub fn insert(&mut self, req: TrackableRequest) { + self.ops.insert_unique(req).expect("no duplicate request IDs"); + } + + /// Handle a proxied response from a peer + pub fn on_response( + &mut self, + request_id: Uuid, + result: Result, + ) { + if let Some(mut req) = self.ops.remove(&request_id) { + let res = result.map_err(TrackerError::Wire); + let _ = req.tx.take().unwrap().send(res); + } + } + + /// A remote peer has disconnected + pub fn on_disconnect(&mut self, from: &BaseboardId) { + self.ops.retain(|mut req| { + if &req.destination == from { + let tx = req.tx.take().unwrap(); + let _ = tx.send(Err(TrackerError::Disconnected)); + false + } else { + true + } + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + use std::time::Duration; + use tokio::spawn; + + /// Recv a message from the proxy and insert it into the tracker. + async fn recv_and_insert( + rx: &mut mpsc::Receiver, + tracker: &mut Tracker, + ) { + let Some(NodeApiRequest::Proxy { destination, wire_request, tx }) = + rx.recv().await + else { + panic!("Invalid NodeApiRequest") + }; + + let req = + TrackableRequest::new(destination, wire_request.request_id, tx); + tracker.insert(req); + } + + #[tokio::test] + async fn proxy_roundtrip_concurrent() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + let _ = spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap(); + + // The first attempt should succeed + assert_eq!(s, CommitStatus::Committed); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Get the first request_id. It's internal to the system, but we need to + // know it here to fake a response. + let request_id_1 = tracker.ops.iter().next().unwrap().request_id; + + // Now spawn a concurrent "user" task that proxies a `Status` request + // to the same node. + let proxy = Proxy::new(tx); + let count = requests_completed.clone(); + let _ = spawn(async move { + let s = proxy.status(destination.clone()).await.unwrap(); + assert_matches!(s, NodeStatus { .. }); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker).await; + assert_eq!(tracker.len(), 2); + + // We still haven't actually completed any operations yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + let request_id_2 = tracker + .ops + .iter() + .filter(|&r| r.request_id != request_id_1) + .next() + .unwrap() + .request_id; + + // Now simulate completion of both requests, in reverse order. + tracker.on_response( + request_id_2, + Ok(WireValue::Status(NodeStatus::default())), + ); + tracker.on_response( + request_id_1, + Ok(WireValue::Commit(CommitStatus::Committed)), + ); + + // Now wait for both responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 2 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } + + #[tokio::test] + async fn proxy_roundtrip_invalid_response() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + let _ = spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); + + // The first attempt should succeed + assert_matches!(s, ProxyError::InvalidResponse(_)); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Get the request_id. It's internal to the system, but we need to know + // it here to fake a response. + let request_id = tracker.ops.iter().next().unwrap().request_id; + + // Now return a successful response, but of the wrong type. + tracker.on_response( + request_id, + Ok(WireValue::Status(NodeStatus::default())), + ); + + // Now wait for the error responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } + + #[tokio::test] + async fn proxy_roundtrip_disconnected() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + let _ = spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); + + // The first attempt should succeed + assert_eq!(s, ProxyError::Disconnected); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Now spawn a concurrent "user" task that proxies a `Status` request + // to the same node. + let proxy = Proxy::new(tx); + let count = requests_completed.clone(); + let dest = destination.clone(); + let _ = spawn(async move { + let s = proxy.status(dest.clone()).await.unwrap_err(); + assert_eq!(s, ProxyError::Disconnected); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker).await; + assert_eq!(tracker.len(), 2); + + // We still haven't actually completed any operations yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Now simulate a disconnection to the proxy destination + tracker.on_disconnect(&destination); + + // Now wait for both responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 2 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } +} diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index df3b34ab074..b3374f56cd6 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -6,13 +6,15 @@ //! [`trust_quorum_protocol::Node`] use crate::connection_manager::{ - ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, + ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, ProxyConnState, }; use crate::ledgers::PersistentStateLedger; +use crate::proxy; use camino::Utf8PathBuf; use omicron_uuid_kinds::RackUuid; use serde::{Deserialize, Serialize}; use slog::{Logger, debug, error, info, o, warn}; +use slog_error_chain::SlogInlineError; use sprockets_tls::keys::SprocketsConfig; use std::collections::BTreeSet; use std::net::SocketAddrV6; @@ -32,6 +34,7 @@ use trust_quorum_protocol::{ use bootstore::schemes::v0::NetworkConfig; /// Whether or not a configuration has committed or is still underway. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum CommitStatus { Committed, Pending, @@ -62,27 +65,28 @@ pub struct Config { /// LRTQ upgrade. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CoordinatorStatus { - config: Configuration, - acked_prepares: BTreeSet, + pub config: Configuration, + pub acked_prepares: BTreeSet, } // Details about a given node's status -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NodeStatus { - connected_peers: BTreeSet, - alarms: BTreeSet, - persistent_state: NodePersistentStateSummary, + pub connected_peers: BTreeSet, + pub alarms: BTreeSet, + pub persistent_state: NodePersistentStateSummary, + pub proxied_requests: u64, } /// A summary of a node's persistent state, leaving out things like key shares /// and hashes. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NodePersistentStateSummary { - has_lrtq_share: bool, - configs: BTreeSet, - shares: BTreeSet, - commits: BTreeSet, - expunged: Option, + pub has_lrtq_share: bool, + pub configs: BTreeSet, + pub shares: BTreeSet, + pub commits: BTreeSet, + pub expunged: Option, } impl From<&PersistentState> for NodePersistentStateSummary { @@ -160,10 +164,26 @@ pub enum NodeApiRequest { /// Retrieve the current network config NetworkConfig { responder: oneshot::Sender> }, + + /// Proxy a [`proxy::WireRequest`] operation to another node + /// + /// When sled-agent is not running there is no direct way to issue + /// operations from Nexus. This occurs when when a node has not yet joined a + /// trust quorum configuration, but the mechanism is also useful during RSS. + /// In these cases, we need to take an existing node that we have access to + /// and proxy requests over sprockets to the `destination` node. + Proxy { + // Where to send the `wire_request` + destination: BaseboardId, + /// The actual request proxied across nodes + wire_request: proxy::WireRequest, + /// A mechanism for responding to the caller + tx: oneshot::Sender>, + }, } /// An error response from a `NodeApiRequest` -#[derive(Error, Debug, PartialEq)] +#[derive(Error, Debug, PartialEq, SlogInlineError)] pub enum NodeApiError { #[error("failed to send request to node task")] Send, @@ -221,6 +241,12 @@ impl NodeTaskHandle { &self.baseboard_id } + /// Return a [`proxy::Proxy`] that allows callers to proxy certain API requests + /// to other nodes. + pub fn proxy(&self) -> proxy::Proxy { + proxy::Proxy::new(self.tx.clone()) + } + /// Initiate a trust quorum reconfiguration at this node pub async fn reconfigure( &self, @@ -374,6 +400,9 @@ pub struct NodeTask { /// channels shared with trust quorum, but is not part of the trust quorum /// protocol. network_config: Option, + + /// A tracker for API requests proxied to other nodes + proxy_tracker: proxy::Tracker, } impl NodeTask { @@ -435,6 +464,7 @@ impl NodeTask { conn_mgr_rx, rx, network_config, + proxy_tracker: proxy::Tracker::new(), }, NodeTaskHandle { baseboard_id, tx, listen_addr }, ) @@ -492,6 +522,7 @@ impl NodeTask { } ConnToMainMsgInner::Disconnected { peer_id } => { self.conn_mgr.on_disconnected(task_id, peer_id.clone()).await; + self.proxy_tracker.on_disconnect(&peer_id); self.node.on_disconnect(&mut self.ctx, peer_id); } ConnToMainMsgInner::Received { from, msg } => { @@ -522,10 +553,57 @@ impl NodeTask { self.broadcast_network_config(Some(&from)).await; } } + ConnToMainMsgInner::ProxyRequestReceived { from, req } => { + info!( + self.log, + "Received proxy request : {req:#?}"; + "peer_id" => %from + ); + self.handle_proxy_request(from, req).await; + } + ConnToMainMsgInner::ProxyResponseReceived { from, rsp } => { + info!( + self.log, + "Received proxy response: {rsp:#?}"; + "peer_id" => %from + ); + let proxy::WireResponse { request_id, result } = rsp; + self.proxy_tracker.on_response(request_id, result); + } } self.save_persistent_state().await; } + // Handle these requests exactly like we handle `NodeApiRequests` but then + // respond to the proxy node over the network rather than oneshot channel + // used by the API. + async fn handle_proxy_request( + &mut self, + from: BaseboardId, + req: proxy::WireRequest, + ) { + let proxy::WireRequest { request_id, op } = req; + match op { + proxy::WireOp::Commit { rack_id, epoch } => { + let res = self.commit(rack_id, epoch).await; + let result = res.map(Into::into).map_err(Into::into); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + proxy::WireOp::PrepareAndCommit { config } => { + let res = self.prepare_and_commit(config).await; + let result = res.map(Into::into).map_err(Into::into); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + proxy::WireOp::Status => { + let result = Ok(self.status().into()); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + } + } + // Handle API requests from sled-agent // // NOTE: We persist state where necessary before responding to clients. Any @@ -542,6 +620,7 @@ impl NodeTask { .update_bootstrap_connections(addrs, corpus) .await; for peer_id in disconnected { + self.proxy_tracker.on_disconnect(&peer_id); self.node.on_disconnect(&mut self.ctx, peer_id); } } @@ -549,18 +628,7 @@ impl NodeTask { self.node.clear_secrets(); } NodeApiRequest::Commit { rack_id, epoch, tx } => { - let res = self - .node - .commit_configuration(&mut self.ctx, rack_id, epoch) - .map(|_| { - if self.ctx.persistent_state().commits.contains(&epoch) - { - CommitStatus::Committed - } else { - CommitStatus::Pending - } - }); - self.save_persistent_state().await; + let res = self.commit(rack_id, epoch).await; let _ = tx.send(res); } NodeApiRequest::ConnMgrStatus { tx } => { @@ -587,26 +655,10 @@ impl NodeTask { let _ = tx.send(res); } NodeApiRequest::NodeStatus { tx } => { - let _ = tx.send(NodeStatus { - connected_peers: self.ctx.connected().clone(), - alarms: self.ctx.alarms().clone(), - persistent_state: self.ctx.persistent_state().into(), - }); + let _ = tx.send(self.status()); } NodeApiRequest::PrepareAndCommit { config, tx } => { - let epoch = config.epoch; - let res = self - .node - .prepare_and_commit(&mut self.ctx, config) - .map(|_| { - if self.ctx.persistent_state().commits.contains(&epoch) - { - CommitStatus::Committed - } else { - CommitStatus::Pending - } - }); - self.save_persistent_state().await; + let res = self.prepare_and_commit(config).await; let _ = tx.send(res); } NodeApiRequest::Reconfigure { msg, tx } => { @@ -626,9 +678,81 @@ impl NodeTask { NodeApiRequest::NetworkConfig { responder } => { let _ = responder.send(self.network_config.clone()); } + NodeApiRequest::Proxy { destination, wire_request, tx } => { + let request_id = wire_request.request_id; + match self + .conn_mgr + .proxy_request(&destination, wire_request) + .await + { + ProxyConnState::Connected => { + // Track the request. If the connection is disconnected + // before the response is received, then the caller will + // get notified about this. + let req = proxy::TrackableRequest::new( + destination.clone(), + request_id, + tx, + ); + self.proxy_tracker.insert(req); + } + ProxyConnState::Disconnected => { + // Return the fact that the message was not sent immediately + let _ = tx.send(Err(proxy::TrackerError::Disconnected)); + } + } + } + } + } + + /// Return the status of this [`NodeTask`] + fn status(&self) -> NodeStatus { + NodeStatus { + connected_peers: self.ctx.connected().clone(), + alarms: self.ctx.alarms().clone(), + persistent_state: self.ctx.persistent_state().into(), + proxied_requests: self.proxy_tracker.len() as u64, } } + /// Commit a configuration synchronously if possible + async fn commit( + &mut self, + rack_id: RackUuid, + epoch: Epoch, + ) -> Result { + let res = self + .node + .commit_configuration(&mut self.ctx, rack_id, epoch) + .map(|_| { + if self.ctx.persistent_state().commits.contains(&epoch) { + CommitStatus::Committed + } else { + CommitStatus::Pending + } + }); + self.save_persistent_state().await; + res + } + + /// PrepareAndCommit a configuration synchronously if possible + async fn prepare_and_commit( + &mut self, + config: Configuration, + ) -> Result { + let epoch = config.epoch; + let res = + self.node.prepare_and_commit(&mut self.ctx, config).map(|_| { + if self.ctx.persistent_state().commits.contains(&epoch) { + CommitStatus::Committed + } else { + CommitStatus::Pending + } + }); + self.save_persistent_state().await; + res + } + /// Save `PersistentState` to storage if necessary async fn save_persistent_state(&mut self) { if self.ctx.persistent_state_change_check_and_reset() { @@ -747,6 +871,8 @@ mod tests { use crate::connection_manager::{ ConnState, RECONNECT_TIME, platform_id_to_baseboard_id, }; + use crate::proxy::ProxyError; + use assert_matches::assert_matches; use camino::Utf8PathBuf; use dropshot::test_util::{LogContext, log_prefix_for_test}; use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; @@ -1227,31 +1353,17 @@ mod tests { .unwrap(); // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed - ) { - acked += 1; - } - } - if acked == num_nodes { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes); // Now load the rack secret at all nodes setup @@ -1335,31 +1447,17 @@ mod tests { coordinator.coordinator_status().await.unwrap().unwrap().config; // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles[0..num_nodes - 1] { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed, - ) { - acked += 1; - } - } - if acked == num_nodes - 1 { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes - 1); // Now ensure that the last node still hasn't prepared or committed for // epoch 1, and isn't connected to any other node. @@ -1472,31 +1570,17 @@ mod tests { .unwrap(); // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed - ) { - acked += 1; - } - } - if acked == num_nodes { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes); // Now load the rack secret at all nodes setup @@ -2022,4 +2106,164 @@ mod tests { setup.cleanup_successful(); } + + /// Proxy API requests to other nodes + #[tokio::test] + pub async fn tq_proxy() { + let num_nodes = 4; + let mut setup = TestSetup::spawn_nodes("tq_proxy", num_nodes).await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Save the configuration as if we were nexus + let config = + coordinator.coordinator_status().await.unwrap().unwrap().config; + + // Commit at each node except the last one + // Commit should be immediate since all nodes have acked prepares + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes - 1); + + // Proxy a commit through the first node to the last node + // It should commit immediately since it has prepared already. + let proxy = &setup.node_handles[0].proxy(); + let destination = setup.members().last().unwrap().clone(); + let status = proxy + .commit(destination.clone(), rack_id, Epoch(1)) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // Commit should be idempotent + let status = proxy + .commit(destination.clone(), rack_id, Epoch(1)) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // PrepareAndCommit should also be idempotent since the configuration is + // already committed + let status = proxy + .prepare_and_commit(destination.clone(), config.clone()) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // Try to commit a configuration that doesn't exist + let err = proxy + .commit(destination.clone(), rack_id, Epoch(2)) + .await + .expect_err("expected to fail proxy commit"); + assert_eq!( + err, + ProxyError::::Inner(CommitError::NotPrepared(Epoch( + 2 + ))) + ); + + // PrepareAndCommit should return pending, because it has to compute + // it's own keyshare for the new config, which will eventually fail. + // + // Nexus will never actually send a `PrepareAndCommit` when there hasn't + // been a commit. This is just here to check the behavior of the proxy + // code. + let mut config2 = config.clone(); + config2.epoch = Epoch(2); + let status = proxy + .prepare_and_commit(destination.clone(), config2) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Pending); + + // Let's get the status for a remote node + let status = proxy + .status(destination.clone()) + .await + .expect("successful status request"); + assert_matches!(status, NodeStatus { .. }); + + // Let's stop the last node and ensure we get an error + setup.simulate_crash_of_last_node().await; + let err = proxy + .status(destination.clone()) + .await + .expect_err("status request failed"); + assert_eq!(err, ProxyError::Disconnected); + + setup.simulate_restart_of_last_node().await; + + // Inform all nodes about the last node. Restarting changes the network + // address because we use ephemeral ports. + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + // Now load the rack secret at all nodes + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); + + // Now ensure we can get the status for the last node again. + let status = proxy + .status(destination.clone()) + .await + .expect("successful status request"); + assert_matches!(status, NodeStatus { .. }); + + setup.cleanup_successful(); + } } From 5301dd67d5cbabe0121994dd704efd5b4b89e309 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Sun, 16 Nov 2025 22:13:14 +0000 Subject: [PATCH 2/7] Better management of disconnecting peers Rather than send a message when a peer disconnects and then handle that message asynchronously, we now wait for the task itself to exit and then return a `DisconnectedPeer` from `ConnectionManager::step`. We only return the `PeerId` for the `DisconnectedPeer` if it is still the existing `established connection` for the given peer and it hasn't been replaced by a newer connection. This prevents calling `Node::on_disconnect` for the stale connection when it might have already received an `on_connect` call for the new connection. --- trust-quorum/src/connection_manager.rs | 79 ++++++++++++++------------ trust-quorum/src/established_conn.rs | 8 --- trust-quorum/src/proxy.rs | 54 +++++++++++++----- trust-quorum/src/task.rs | 40 +++++++------ 4 files changed, 107 insertions(+), 74 deletions(-) diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index de1e5f4fd04..fcaf8743751 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -55,6 +55,19 @@ pub enum MainToConnMsg { Msg(WireMsg), } +/// The task for this sprockets connection just exited. If +/// `ConnectionManager::step` returns this value and `peer_id` is `Some` than +/// it means no new connection for the peer has yet been established. It is +/// safe to cleanup state for the given `peer_id`, by, for instance, calling +/// `Node::on_disconnect`. +/// +/// By always returning the `task_id`, we allow cleanup of proxy requests for +/// stale nodes that will never complete. +pub struct DisconnectedPeer { + pub task_id: task::Id, + pub peer_id: Option, +} + /// All possible messages sent over established connections /// /// This include trust quorum related `PeerMsg`s, but also ancillary network @@ -106,7 +119,6 @@ pub enum ConnToMainMsgInner { Connected { addr: SocketAddrV6, peer_id: BaseboardId }, Received { from: BaseboardId, msg: PeerMsg }, ReceivedNetworkConfig { from: BaseboardId, config: NetworkConfig }, - Disconnected { peer_id: BaseboardId }, ProxyRequestReceived { from: BaseboardId, req: proxy::WireRequest }, ProxyResponseReceived { from: BaseboardId, rsp: proxy::WireResponse }, } @@ -154,7 +166,7 @@ impl BiHashItem for TaskHandle { } pub struct EstablishedTaskHandle { - baseboard_id: BaseboardId, + pub baseboard_id: BaseboardId, task_handle: TaskHandle, } @@ -246,7 +258,7 @@ pub struct ConnMgrStatus { /// The state of a proxy connection pub enum ProxyConnState { - Connected, + Connected(task::Id), Disconnected, } @@ -447,7 +459,7 @@ impl ConnMgr { if let Some(h) = self.established.get1(destination) { info!(self.log, "Sending {req:?}"; "peer_id" => %destination); h.send(req).await; - ProxyConnState::Connected + ProxyConnState::Connected(h.task_id()) } else { ProxyConnState::Disconnected } @@ -471,32 +483,37 @@ impl ConnMgr { /// Perform any polling related operations that the connection /// manager must perform concurrently. + /// + /// Return `Ok(Some(DisconnectedPeer))` if an `EstablishedConnectionTask` + /// that was still the exclusive connection task for a specific peer has + /// just exited. pub async fn step( &mut self, corpus: Vec, - ) -> Result<(), AcceptError> { - tokio::select! { + ) -> Result, AcceptError> { + let disconnected_peer = tokio::select! { acceptor = self.server.accept(corpus.clone()) => { self.accept(acceptor?).await?; + None } Some(res) = self.join_set.join_next_with_id() => { match res { Ok((task_id, _)) => { - self.on_task_exit(task_id).await; + Some(self.on_task_exit(task_id)) } Err(err) => { warn!(self.log, "Connection task panic: {err}"); - self.on_task_exit(err.id()).await; + Some(self.on_task_exit(err.id())) } - } } _ = self.reconnect_interval.tick() => { self.reconnect(corpus.clone()).await; + None } - } + }; - Ok(()) + Ok(disconnected_peer) } pub async fn accept( @@ -686,22 +703,6 @@ impl ConnMgr { } } - /// The established connection task has asynchronously exited. - pub async fn on_disconnected( - &mut self, - task_id: task::Id, - peer_id: BaseboardId, - ) { - if let Some(established_task_handle) = self.established.get1(&peer_id) { - if established_task_handle.task_id() != task_id { - // This was a stale disconnect - return; - } - } - warn!(self.log, "peer disconnected"; "peer_id" => %peer_id); - let _ = self.established.remove1(&peer_id); - } - /// Initiate connections if a corresponding task doesn't already exist. This /// must be called periodically to handle transient disconnections which /// cause tasks to exit. @@ -740,9 +741,9 @@ impl ConnMgr { &mut self, addrs: BTreeSet, corpus: Vec, - ) -> BTreeSet { + ) -> Vec { if self.bootstrap_addrs == addrs { - return BTreeSet::new(); + return vec![]; } // We don't try to compare addresses from accepted nodes. If DDMD @@ -770,10 +771,10 @@ impl ConnMgr { self.connect_client(corpus.clone(), addr).await; } - let mut disconnected_peers = BTreeSet::new(); + let mut disconnected_peers = Vec::new(); for addr in to_disconnect { - if let Some(peer_id) = self.disconnect_client(addr).await { - disconnected_peers.insert(peer_id); + if let Some(handle) = self.disconnect_client(addr).await { + disconnected_peers.push(handle); } } disconnected_peers @@ -861,7 +862,7 @@ impl ConnMgr { async fn disconnect_client( &mut self, addr: SocketAddrV6, - ) -> Option { + ) -> Option { if let Some(handle) = self.connecting.remove2(&addr) { // The connection has not yet completed its handshake info!( @@ -880,7 +881,7 @@ impl ConnMgr { "peer_id" => %handle.baseboard_id ); handle.abort(); - Some(handle.baseboard_id) + Some(handle) } else { None } @@ -888,7 +889,9 @@ impl ConnMgr { } /// Remove any references to the given task - async fn on_task_exit(&mut self, task_id: task::Id) { + /// + /// Return a `DisconnectedPeer` for the given `task_id`. + fn on_task_exit(&mut self, task_id: task::Id) -> DisconnectedPeer { // We're most likely to find the task as established so we start with that if let Some(handle) = self.established.remove2(&task_id) { info!( @@ -898,6 +901,10 @@ impl ConnMgr { "peer_addr" => %handle.addr(), "peer_id" => %handle.baseboard_id ); + return DisconnectedPeer { + task_id, + peer_id: Some(handle.baseboard_id), + }; } else if let Some(handle) = self.accepting.remove1(&task_id) { info!( self.log, @@ -919,6 +926,8 @@ impl ConnMgr { "task_id" => ?task_id ); } + + DisconnectedPeer { task_id, peer_id: None } } } diff --git a/trust-quorum/src/established_conn.rs b/trust-quorum/src/established_conn.rs index a357ecc6d78..ab67cc00dfc 100644 --- a/trust-quorum/src/established_conn.rs +++ b/trust-quorum/src/established_conn.rs @@ -155,14 +155,6 @@ impl EstablishedConn { } async fn close(&mut self) { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { - task_id: self.task_id, - msg: ConnToMainMsgInner::Disconnected { - peer_id: self.peer_id.clone(), - }, - }) { - warn!(self.log, "Failed to send to main task"); - } let _ = self.writer.shutdown().await; } diff --git a/trust-quorum/src/proxy.rs b/trust-quorum/src/proxy.rs index c8536e21ce7..d351a76984b 100644 --- a/trust-quorum/src/proxy.rs +++ b/trust-quorum/src/proxy.rs @@ -23,6 +23,7 @@ use omicron_uuid_kinds::RackUuid; use serde::{Deserialize, Serialize}; use slog_error_chain::{InlineErrorChain, SlogInlineError}; use tokio::sync::{mpsc, oneshot}; +use tokio::task; use trust_quorum_protocol::{ BaseboardId, CommitError, Configuration, Epoch, PrepareAndCommitError, }; @@ -246,8 +247,15 @@ pub enum TrackerError { /// A trackable in-flight proxy request, owned by the `Tracker` #[derive(Debug)] pub struct TrackableRequest { + /// Each `TrackableRequest` is bound to a + /// [`crate::established_conn::EstablishedConn`] that is uniquely identified + /// by its `tokio::task::Id`. This is useful because it disambiguates + /// connect and disconnect operations for the same `destination` such that + /// they don't have to be totally ordered. It is enough to know that a + /// disconnect for a given `task_id` only occurs after a connect. + task_id: task::Id, + /// A unique id for a given proxy request request_id: Uuid, - destination: BaseboardId, // The option exists so we can take the sender out in `on_disconnect`, when // the request is borrowed, but about to be discarded. tx: DebugIgnore>>>, @@ -255,11 +263,11 @@ pub struct TrackableRequest { impl TrackableRequest { pub fn new( - destination: BaseboardId, + task_id: task::Id, request_id: Uuid, tx: oneshot::Sender>, ) -> TrackableRequest { - TrackableRequest { request_id, destination, tx: DebugIgnore(Some(tx)) } + TrackableRequest { task_id, request_id, tx: DebugIgnore(Some(tx)) } } } @@ -306,9 +314,9 @@ impl Tracker { } /// A remote peer has disconnected - pub fn on_disconnect(&mut self, from: &BaseboardId) { + pub fn on_disconnect(&mut self, task_id: task::Id) { self.ops.retain(|mut req| { - if &req.destination == from { + if req.task_id == task_id { let tx = req.tx.take().unwrap(); let _ = tx.send(Err(TrackerError::Disconnected)); false @@ -335,15 +343,15 @@ mod tests { async fn recv_and_insert( rx: &mut mpsc::Receiver, tracker: &mut Tracker, + task_id: task::Id, ) { - let Some(NodeApiRequest::Proxy { destination, wire_request, tx }) = + let Some(NodeApiRequest::Proxy { wire_request, tx, .. }) = rx.recv().await else { panic!("Invalid NodeApiRequest") }; - let req = - TrackableRequest::new(destination, wire_request.request_id, tx); + let req = TrackableRequest::new(task_id, wire_request.request_id, tx); tracker.insert(req); } @@ -355,6 +363,11 @@ mod tests { }; let rack_id = RackUuid::new_v4(); + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], // and the receiver is owned by the local [`crate::NodeTask`] let (tx, mut rx) = mpsc::channel(5); @@ -378,7 +391,7 @@ mod tests { assert_eq!(tracker.len(), 0); // Simulate receiving a request by the [`NodeTask`] - recv_and_insert(&mut rx, &mut tracker).await; + recv_and_insert(&mut rx, &mut tracker, task_id).await; // We now have a request in the tracker assert_eq!(tracker.len(), 1); @@ -401,7 +414,7 @@ mod tests { }); // Simulate receiving a request by the [`NodeTask`] - recv_and_insert(&mut rx, &mut tracker).await; + recv_and_insert(&mut rx, &mut tracker, task_id).await; assert_eq!(tracker.len(), 2); // We still haven't actually completed any operations yet @@ -456,6 +469,11 @@ mod tests { let proxy = Proxy::new(tx.clone()); let mut tracker = Tracker::new(); + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + let requests_completed = Arc::new(AtomicUsize::new(0)); // This is the first "user" task that will issue proxy operations @@ -473,7 +491,7 @@ mod tests { assert_eq!(tracker.len(), 0); // Simulate receiving a request by the [`NodeTask`] - recv_and_insert(&mut rx, &mut tracker).await; + recv_and_insert(&mut rx, &mut tracker, task_id).await; // We now have a request in the tracker assert_eq!(tracker.len(), 1); @@ -517,6 +535,11 @@ mod tests { }; let rack_id = RackUuid::new_v4(); + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], // and the receiver is owned by the local [`crate::NodeTask`] let (tx, mut rx) = mpsc::channel(5); @@ -540,7 +563,7 @@ mod tests { assert_eq!(tracker.len(), 0); // Simulate receiving a request by the [`NodeTask`] - recv_and_insert(&mut rx, &mut tracker).await; + recv_and_insert(&mut rx, &mut tracker, task_id).await; // We now have a request in the tracker assert_eq!(tracker.len(), 1); @@ -560,14 +583,15 @@ mod tests { }); // Simulate receiving a request by the [`NodeTask`] - recv_and_insert(&mut rx, &mut tracker).await; + recv_and_insert(&mut rx, &mut tracker, task_id).await; assert_eq!(tracker.len(), 2); // We still haven't actually completed any operations yet assert_eq!(requests_completed.load(Ordering::Relaxed), 0); - // Now simulate a disconnection to the proxy destination - tracker.on_disconnect(&destination); + // Now simulate a disconnection to the proxy destination for this + // specific connection task. + tracker.on_disconnect(task_id); // Now wait for both responses to be processed wait_for_condition( diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index b3374f56cd6..78069db5e54 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -6,7 +6,8 @@ //! [`trust_quorum_protocol::Node`] use crate::connection_manager::{ - ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, ProxyConnState, + ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, + DisconnectedPeer, ProxyConnState, }; use crate::ledgers::PersistentStateLedger; use crate::proxy; @@ -482,9 +483,15 @@ impl NodeTask { self.on_api_request(request).await; } res = self.conn_mgr.step(corpus.clone()) => { - if let Err(err) = res { - error!(self.log, "Failed to accept connection"; &err); - continue; + match res { + Ok(Some(disconnected_peer)) => { + self.on_disconnect(disconnected_peer); + } + Ok(None) => {} + Err(err) => { + error!(self.log, "Failed to accept connection"; &err); + continue; + } } } Some(msg) = self.conn_mgr_rx.recv() => { @@ -498,6 +505,14 @@ impl NodeTask { } } + /// A task managing an established connenction to a peer has just exited + fn on_disconnect(&mut self, disconnected_peer: DisconnectedPeer) { + self.proxy_tracker.on_disconnect(disconnected_peer.task_id); + if let Some(peer_id) = disconnected_peer.peer_id { + self.node.on_disconnect(&mut self.ctx, peer_id); + } + } + // Handle messages from connection management tasks // // We persist state at the end of this method, which always occurs before @@ -520,11 +535,6 @@ impl NodeTask { self.send_network_config(&peer_id).await; self.node.on_connect(&mut self.ctx, peer_id); } - ConnToMainMsgInner::Disconnected { peer_id } => { - self.conn_mgr.on_disconnected(task_id, peer_id.clone()).await; - self.proxy_tracker.on_disconnect(&peer_id); - self.node.on_disconnect(&mut self.ctx, peer_id); - } ConnToMainMsgInner::Received { from, msg } => { self.node.handle(&mut self.ctx, from, msg); } @@ -619,9 +629,9 @@ impl NodeTask { .conn_mgr .update_bootstrap_connections(addrs, corpus) .await; - for peer_id in disconnected { - self.proxy_tracker.on_disconnect(&peer_id); - self.node.on_disconnect(&mut self.ctx, peer_id); + for handle in disconnected { + self.proxy_tracker.on_disconnect(handle.task_id()); + self.node.on_disconnect(&mut self.ctx, handle.baseboard_id); } } NodeApiRequest::ClearSecrets => { @@ -685,14 +695,12 @@ impl NodeTask { .proxy_request(&destination, wire_request) .await { - ProxyConnState::Connected => { + ProxyConnState::Connected(task_id) => { // Track the request. If the connection is disconnected // before the response is received, then the caller will // get notified about this. let req = proxy::TrackableRequest::new( - destination.clone(), - request_id, - tx, + task_id, request_id, tx, ); self.proxy_tracker.insert(req); } From e74e09dbe5d6a2a8a9823a4ed5917786d2c3736e Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Sun, 16 Nov 2025 22:46:00 +0000 Subject: [PATCH 3/7] comments --- trust-quorum/protocol/src/node.rs | 10 ++++++++++ trust-quorum/src/connection_manager.rs | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/trust-quorum/protocol/src/node.rs b/trust-quorum/protocol/src/node.rs index dac315bf25f..aad4bd41a8e 100644 --- a/trust-quorum/protocol/src/node.rs +++ b/trust-quorum/protocol/src/node.rs @@ -381,6 +381,16 @@ impl Node { } /// A peer node has disconnected from this one + /// + /// Note: It is safe if a call to `on_disconnect` is missed due to a new + /// connection replacing the existing connection and resulting in a call + /// to `on_connect` while the node thinks it still maintains a connection. + /// + /// All active behavior such as retries occur in `on_connect`. If the + /// contents of this method change such that it is no longer safe to call + /// `on_connect` without first calling `on_disconnect` for an already + /// connected peer, then we can call `on_disconnect` first from directly + /// within `on_connect`. For now that is unnecessary. pub fn on_disconnect( &mut self, ctx: &mut impl NodeHandlerCtx, diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index fcaf8743751..a23ae2a030e 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -62,7 +62,7 @@ pub enum MainToConnMsg { /// `Node::on_disconnect`. /// /// By always returning the `task_id`, we allow cleanup of proxy requests for -/// stale nodes that will never complete. +/// stale connections that will never complete. pub struct DisconnectedPeer { pub task_id: task::Id, pub peer_id: Option, From 4b6d31061e8367345ed64584a68c3983c8effc27 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Sun, 16 Nov 2025 23:09:41 +0000 Subject: [PATCH 4/7] review fixes --- trust-quorum/src/established_conn.rs | 20 ++++++++++---------- trust-quorum/src/proxy.rs | 16 ++++++++++++---- trust-quorum/src/task.rs | 10 +++++----- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/trust-quorum/src/established_conn.rs b/trust-quorum/src/established_conn.rs index ab67cc00dfc..8e38245ace4 100644 --- a/trust-quorum/src/established_conn.rs +++ b/trust-quorum/src/established_conn.rs @@ -190,7 +190,7 @@ impl EstablishedConn { debug!(self.log, "Received {msg:?}"); match msg { WireMsg::Tq(msg) => { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::Received { from: self.peer_id.clone(), @@ -201,7 +201,7 @@ impl EstablishedConn { self.log, "Failed to send received fsm msg to main task" ); - panic!("Connection to main task channel full"); + panic!("Connection to main task channel error: {e:#?}"); } } WireMsg::Ping => { @@ -210,7 +210,7 @@ impl EstablishedConn { } WireMsg::NetworkConfig(config) => { let generation = config.generation; - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::ReceivedNetworkConfig { from: self.peer_id.clone(), @@ -222,11 +222,11 @@ impl EstablishedConn { "Failed to send received NetworkConfig with generation {generation} to main task" ); - panic!("Connection to main task channnel full"); + panic!("Connection to main task channel error: {e:#?}"); } } WireMsg::ProxyRequest(req) => { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::ProxyRequestReceived { from: self.peer_id.clone(), @@ -235,13 +235,13 @@ impl EstablishedConn { }) { error!( self.log, - "Failed to send received proxy msg to the main task" + "Failed to send received proxy request to the main task" ); - panic!("Connection to main task channel full"); + panic!("Connection to main task channel error: {e:#?}"); } } WireMsg::ProxyResponse(rsp) => { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::ProxyResponseReceived { from: self.peer_id.clone(), @@ -250,9 +250,9 @@ impl EstablishedConn { }) { error!( self.log, - "Failed to send received proxy msg to the main task" + "Failed to send received proxy resposne to the main task" ); - panic!("Connection to main task channel full"); + panic!("Connection to main task channel error: {e:#?}"); } } } diff --git a/trust-quorum/src/proxy.rs b/trust-quorum/src/proxy.rs index d351a76984b..4b376ab3a99 100644 --- a/trust-quorum/src/proxy.rs +++ b/trust-quorum/src/proxy.rs @@ -374,6 +374,10 @@ mod tests { let proxy = Proxy::new(tx.clone()); let mut tracker = Tracker::new(); + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. let requests_completed = Arc::new(AtomicUsize::new(0)); // This is the first "user" task that will issue proxy operations @@ -474,6 +478,10 @@ mod tests { // connections here, so just use an ID of an arbitrary task. let task_id = task::spawn(async {}).id(); + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. let requests_completed = Arc::new(AtomicUsize::new(0)); // This is the first "user" task that will issue proxy operations @@ -481,8 +489,6 @@ mod tests { let dest = destination.clone(); let _ = spawn(async move { let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); - - // The first attempt should succeed assert_matches!(s, ProxyError::InvalidResponse(_)); let _ = count.fetch_add(1, Ordering::Relaxed); }); @@ -546,6 +552,10 @@ mod tests { let proxy = Proxy::new(tx.clone()); let mut tracker = Tracker::new(); + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. let requests_completed = Arc::new(AtomicUsize::new(0)); // This is the first "user" task that will issue proxy operations @@ -553,8 +563,6 @@ mod tests { let dest = destination.clone(); let _ = spawn(async move { let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); - - // The first attempt should succeed assert_eq!(s, ProxyError::Disconnected); let _ = count.fetch_add(1, Ordering::Relaxed); }); diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 78069db5e54..46241d71115 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -1391,7 +1391,7 @@ mod tests { /// at the first 3 nodes. Then we go and issue a `PrepareAndCommit` to the last /// node and ensure it commits. #[tokio::test] - pub async fn tq_initial_config_prepare_and_commit() { + async fn tq_initial_config_prepare_and_commit() { let num_nodes = 4; let setup = TestSetup::spawn_nodes( "tq_initial_config_prepare_and_commit", @@ -1526,7 +1526,7 @@ mod tests { /// the configuration for the prior epoch. This should result in commit /// advancing to the latest epoch. #[tokio::test] - pub async fn tq_reconfig_with_commit_advance() { + async fn tq_reconfig_with_commit_advance() { let num_nodes = 4; let setup = TestSetup::spawn_nodes( "tq_recofnig_with_commit_advance", @@ -1744,7 +1744,7 @@ mod tests { } #[tokio::test] - pub async fn tq_upgrade_from_lrtq() { + async fn tq_upgrade_from_lrtq() { let num_nodes = 4; let (setup, rack_id) = TestSetup::spawn_nodes_with_lrtq_shares( "tq_upgrade_from_lrtq", @@ -1831,7 +1831,7 @@ mod tests { /// Ensure state is persisted as we expect #[tokio::test] - pub async fn tq_persistent_state() { + async fn tq_persistent_state() { let num_nodes = 4; let mut setup = TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; @@ -2117,7 +2117,7 @@ mod tests { /// Proxy API requests to other nodes #[tokio::test] - pub async fn tq_proxy() { + async fn tq_proxy() { let num_nodes = 4; let mut setup = TestSetup::spawn_nodes("tq_proxy", num_nodes).await; let rack_id = RackUuid::new_v4(); From c2f99e594b7c9b09055c3d883be7b07b9231c1ee Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 17 Nov 2025 00:20:30 +0000 Subject: [PATCH 5/7] clippy --- trust-quorum/src/proxy.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/trust-quorum/src/proxy.rs b/trust-quorum/src/proxy.rs index 4b376ab3a99..c44bcd59b91 100644 --- a/trust-quorum/src/proxy.rs +++ b/trust-quorum/src/proxy.rs @@ -383,7 +383,7 @@ mod tests { // This is the first "user" task that will issue proxy operations let count = requests_completed.clone(); let dest = destination.clone(); - let _ = spawn(async move { + spawn(async move { let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap(); // The first attempt should succeed @@ -411,7 +411,7 @@ mod tests { // to the same node. let proxy = Proxy::new(tx); let count = requests_completed.clone(); - let _ = spawn(async move { + spawn(async move { let s = proxy.status(destination.clone()).await.unwrap(); assert_matches!(s, NodeStatus { .. }); let _ = count.fetch_add(1, Ordering::Relaxed); @@ -426,8 +426,7 @@ mod tests { let request_id_2 = tracker .ops .iter() - .filter(|&r| r.request_id != request_id_1) - .next() + .find(|&r| r.request_id != request_id_1) .unwrap() .request_id; @@ -487,7 +486,7 @@ mod tests { // This is the first "user" task that will issue proxy operations let count = requests_completed.clone(); let dest = destination.clone(); - let _ = spawn(async move { + spawn(async move { let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); assert_matches!(s, ProxyError::InvalidResponse(_)); let _ = count.fetch_add(1, Ordering::Relaxed); @@ -561,7 +560,7 @@ mod tests { // This is the first "user" task that will issue proxy operations let count = requests_completed.clone(); let dest = destination.clone(); - let _ = spawn(async move { + spawn(async move { let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); assert_eq!(s, ProxyError::Disconnected); let _ = count.fetch_add(1, Ordering::Relaxed); @@ -584,7 +583,7 @@ mod tests { let proxy = Proxy::new(tx); let count = requests_completed.clone(); let dest = destination.clone(); - let _ = spawn(async move { + spawn(async move { let s = proxy.status(dest.clone()).await.unwrap_err(); assert_eq!(s, ProxyError::Disconnected); let _ = count.fetch_add(1, Ordering::Relaxed); From 4a8e80b52724873923148a8eef3294e776e46de1 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 17 Nov 2025 00:21:28 +0000 Subject: [PATCH 6/7] whoops, missed one --- trust-quorum/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 46241d71115..c47fd644339 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -1312,7 +1312,7 @@ mod tests { /// Commit an initial configuration at all nodes #[tokio::test] - pub async fn tq_initial_config() { + async fn tq_initial_config() { let num_nodes = 4; let setup = TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; From e7def4d07c1600b850693aa44484a8facbd6d8e5 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 17 Nov 2025 12:01:47 -0500 Subject: [PATCH 7/7] fix docs --- trust-quorum/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index 5220ba3f797..042944955a4 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -10,6 +10,8 @@ mod ledgers; mod proxy; mod task; +pub use proxy::Proxy; + pub(crate) use connection_manager::{ ConnToMainMsg, ConnToMainMsgInner, MainToConnMsg, WireMsg, };