diff --git a/Cargo.lock b/Cargo.lock index e2e8be3c120..16c9c9ce22e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14842,6 +14842,7 @@ dependencies = [ "chacha20poly1305", "ciborium", "daft", + "debug-ignore", "derive_more 0.99.20", "dropshot", "futures", diff --git a/trust-quorum/Cargo.toml b/trust-quorum/Cargo.toml index a778dbe6daa..5a7f6e24dc1 100644 --- a/trust-quorum/Cargo.toml +++ b/trust-quorum/Cargo.toml @@ -16,6 +16,7 @@ camino.workspace = true chacha20poly1305.workspace = true ciborium.workspace = true daft.workspace = true +debug-ignore.workspace = true derive_more.workspace = true futures.workspace = true gfss.workspace = true diff --git a/trust-quorum/protocol/src/node.rs b/trust-quorum/protocol/src/node.rs index 0571b58e35c..aad4bd41a8e 100644 --- a/trust-quorum/protocol/src/node.rs +++ b/trust-quorum/protocol/src/node.rs @@ -32,7 +32,9 @@ use crate::{ use daft::{Diffable, Leaf}; use gfss::shamir::Share; use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; use slog::{Logger, error, info, o, warn}; +use slog_error_chain::SlogInlineError; /// An entity capable of participating in trust quorum /// @@ -379,6 +381,16 @@ impl Node { } /// A peer node has disconnected from this one + /// + /// Note: It is safe if a call to `on_disconnect` is missed due to a new + /// connection replacing the existing connection and resulting in a call + /// to `on_connect` while the node thinks it still maintains a connection. + /// + /// All active behavior such as retries occur in `on_connect`. If the + /// contents of this method change such that it is no longer safe to call + /// `on_connect` without first calling `on_disconnect` for an already + /// connected peer, then we can call `on_disconnect` first from directly + /// within `on_connect`. For now that is unnecessary. pub fn on_disconnect( &mut self, ctx: &mut impl NodeHandlerCtx, @@ -1063,7 +1075,16 @@ impl Node { } } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] pub enum CommitError { #[error("invalid rack id")] InvalidRackId( @@ -1077,7 +1098,16 @@ pub enum CommitError { Expunged { epoch: Epoch, from: BaseboardId }, } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] pub enum PrepareAndCommitError { #[error("invalid rack id")] InvalidRackId( diff --git a/trust-quorum/protocol/src/validators.rs b/trust-quorum/protocol/src/validators.rs index 9301e1cff01..ccc8049f82b 100644 --- a/trust-quorum/protocol/src/validators.rs +++ b/trust-quorum/protocol/src/validators.rs @@ -12,6 +12,7 @@ use crate::{ }; use daft::{BTreeSetDiff, Diffable, Leaf}; use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; use slog::{Logger, error, info, warn}; use std::collections::BTreeSet; @@ -57,7 +58,9 @@ pub struct SledExpungedError { last_prepared_epoch: Option, } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[derive( + Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, +)] #[error("mismatched rack id: expected {expected:?}, got {got:?}")] pub struct MismatchedRackIdError { pub expected: RackUuid, diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index 48803e6f794..a23ae2a030e 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -5,6 +5,7 @@ //! A mechanism for maintaining a full mesh of trust quorum node connections use crate::established_conn::EstablishedConn; +use crate::proxy; use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; // TODO: Move to this crate @@ -12,6 +13,7 @@ use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; use bootstore::schemes::v0::NetworkConfig; use camino::Utf8PathBuf; +use derive_more::From; use iddqd::{ BiHashItem, BiHashMap, TriHashItem, TriHashMap, bi_upcast, tri_upcast, }; @@ -53,6 +55,19 @@ pub enum MainToConnMsg { Msg(WireMsg), } +/// The task for this sprockets connection just exited. If +/// `ConnectionManager::step` returns this value and `peer_id` is `Some` than +/// it means no new connection for the peer has yet been established. It is +/// safe to cleanup state for the given `peer_id`, by, for instance, calling +/// `Node::on_disconnect`. +/// +/// By always returning the `task_id`, we allow cleanup of proxy requests for +/// stale connections that will never complete. +pub struct DisconnectedPeer { + pub task_id: task::Id, + pub peer_id: Option, +} + /// All possible messages sent over established connections /// /// This include trust quorum related `PeerMsg`s, but also ancillary network @@ -60,7 +75,7 @@ pub enum MainToConnMsg { /// /// All `WireMsg`s sent between nodes is prefixed with a 4 byte size header used /// for framing. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, From)] pub enum WireMsg { /// Used for connection keep alive Ping, @@ -79,6 +94,12 @@ pub enum WireMsg { /// of tiny information layered on top of trust quorum. You can still think /// of it as a bootstore, although, we no longer use that name. NetworkConfig(NetworkConfig), + + /// Requests proxied to other nodes + ProxyRequest(proxy::WireRequest), + + /// Responses to proxy requests + ProxyResponse(proxy::WireResponse), } /// Messages sent from connection managing tasks to the main peer task @@ -98,7 +119,8 @@ pub enum ConnToMainMsgInner { Connected { addr: SocketAddrV6, peer_id: BaseboardId }, Received { from: BaseboardId, msg: PeerMsg }, ReceivedNetworkConfig { from: BaseboardId, config: NetworkConfig }, - Disconnected { peer_id: BaseboardId }, + ProxyRequestReceived { from: BaseboardId, req: proxy::WireRequest }, + ProxyResponseReceived { from: BaseboardId, rsp: proxy::WireResponse }, } pub struct TaskHandle { @@ -120,15 +142,11 @@ impl TaskHandle { self.abort_handle.abort() } - pub async fn send(&self, msg: PeerMsg) { - let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await; - } - - pub async fn send_network_config(&self, config: NetworkConfig) { - let _ = self - .tx - .send(MainToConnMsg::Msg(WireMsg::NetworkConfig(config))) - .await; + pub async fn send(&self, msg: T) + where + T: Into, + { + let _ = self.tx.send(MainToConnMsg::Msg(msg.into())).await; } } @@ -148,7 +166,7 @@ impl BiHashItem for TaskHandle { } pub struct EstablishedTaskHandle { - baseboard_id: BaseboardId, + pub baseboard_id: BaseboardId, task_handle: TaskHandle, } @@ -172,7 +190,10 @@ impl EstablishedTaskHandle { self.task_handle.abort(); } - pub async fn send(&self, msg: PeerMsg) { + pub async fn send(&self, msg: T) + where + T: Into, + { let _ = self.task_handle.send(msg).await; } } @@ -235,6 +256,12 @@ pub struct ConnMgrStatus { pub total_tasks_spawned: u64, } +/// The state of a proxy connection +pub enum ProxyConnState { + Connected(task::Id), + Disconnected, +} + /// A structure to manage all sprockets connections to peer nodes /// /// Each sprockets connection runs in its own task which communicates with the @@ -399,7 +426,7 @@ impl ConnMgr { "peer_id" => %h.baseboard_id, "generation" => network_config.generation ); - h.task_handle.send_network_config(network_config.clone()).await; + h.send(network_config.clone()).await; } } @@ -415,38 +442,78 @@ impl ConnMgr { "peer_id" => %h.baseboard_id, "generation" => network_config.generation ); - h.task_handle.send_network_config(network_config.clone()).await; + h.send(network_config.clone()).await; + } + } + + /// Forward an API request to another node + /// + /// Return the state of the connection at this point in time so that the + /// [`proxy::Tracker`] can manage the outstanding request on behalf of the + /// user. + pub async fn proxy_request( + &mut self, + destination: &BaseboardId, + req: proxy::WireRequest, + ) -> ProxyConnState { + if let Some(h) = self.established.get1(destination) { + info!(self.log, "Sending {req:?}"; "peer_id" => %destination); + h.send(req).await; + ProxyConnState::Connected(h.task_id()) + } else { + ProxyConnState::Disconnected + } + } + + /// Return a response to a proxied request to another node + /// + /// There is no need to track whether this succeeds or fails. If the + /// connection goes away the client on the other side will notice it and + /// retry if needed. + pub async fn proxy_response( + &mut self, + destination: &BaseboardId, + rsp: proxy::WireResponse, + ) { + if let Some(h) = self.established.get1(destination) { + info!(self.log, "Sending {rsp:?}"; "peer_id" => %destination); + h.send(rsp).await; } } /// Perform any polling related operations that the connection /// manager must perform concurrently. + /// + /// Return `Ok(Some(DisconnectedPeer))` if an `EstablishedConnectionTask` + /// that was still the exclusive connection task for a specific peer has + /// just exited. pub async fn step( &mut self, corpus: Vec, - ) -> Result<(), AcceptError> { - tokio::select! { + ) -> Result, AcceptError> { + let disconnected_peer = tokio::select! { acceptor = self.server.accept(corpus.clone()) => { self.accept(acceptor?).await?; + None } Some(res) = self.join_set.join_next_with_id() => { match res { Ok((task_id, _)) => { - self.on_task_exit(task_id).await; + Some(self.on_task_exit(task_id)) } Err(err) => { warn!(self.log, "Connection task panic: {err}"); - self.on_task_exit(err.id()).await; + Some(self.on_task_exit(err.id())) } - } } _ = self.reconnect_interval.tick() => { self.reconnect(corpus.clone()).await; + None } - } + }; - Ok(()) + Ok(disconnected_peer) } pub async fn accept( @@ -636,22 +703,6 @@ impl ConnMgr { } } - /// The established connection task has asynchronously exited. - pub async fn on_disconnected( - &mut self, - task_id: task::Id, - peer_id: BaseboardId, - ) { - if let Some(established_task_handle) = self.established.get1(&peer_id) { - if established_task_handle.task_id() != task_id { - // This was a stale disconnect - return; - } - } - warn!(self.log, "peer disconnected"; "peer_id" => %peer_id); - let _ = self.established.remove1(&peer_id); - } - /// Initiate connections if a corresponding task doesn't already exist. This /// must be called periodically to handle transient disconnections which /// cause tasks to exit. @@ -690,9 +741,9 @@ impl ConnMgr { &mut self, addrs: BTreeSet, corpus: Vec, - ) -> BTreeSet { + ) -> Vec { if self.bootstrap_addrs == addrs { - return BTreeSet::new(); + return vec![]; } // We don't try to compare addresses from accepted nodes. If DDMD @@ -720,10 +771,10 @@ impl ConnMgr { self.connect_client(corpus.clone(), addr).await; } - let mut disconnected_peers = BTreeSet::new(); + let mut disconnected_peers = Vec::new(); for addr in to_disconnect { - if let Some(peer_id) = self.disconnect_client(addr).await { - disconnected_peers.insert(peer_id); + if let Some(handle) = self.disconnect_client(addr).await { + disconnected_peers.push(handle); } } disconnected_peers @@ -811,7 +862,7 @@ impl ConnMgr { async fn disconnect_client( &mut self, addr: SocketAddrV6, - ) -> Option { + ) -> Option { if let Some(handle) = self.connecting.remove2(&addr) { // The connection has not yet completed its handshake info!( @@ -830,7 +881,7 @@ impl ConnMgr { "peer_id" => %handle.baseboard_id ); handle.abort(); - Some(handle.baseboard_id) + Some(handle) } else { None } @@ -838,7 +889,9 @@ impl ConnMgr { } /// Remove any references to the given task - async fn on_task_exit(&mut self, task_id: task::Id) { + /// + /// Return a `DisconnectedPeer` for the given `task_id`. + fn on_task_exit(&mut self, task_id: task::Id) -> DisconnectedPeer { // We're most likely to find the task as established so we start with that if let Some(handle) = self.established.remove2(&task_id) { info!( @@ -848,6 +901,10 @@ impl ConnMgr { "peer_addr" => %handle.addr(), "peer_id" => %handle.baseboard_id ); + return DisconnectedPeer { + task_id, + peer_id: Some(handle.baseboard_id), + }; } else if let Some(handle) = self.accepting.remove1(&task_id) { info!( self.log, @@ -869,6 +926,8 @@ impl ConnMgr { "task_id" => ?task_id ); } + + DisconnectedPeer { task_id, peer_id: None } } } diff --git a/trust-quorum/src/established_conn.rs b/trust-quorum/src/established_conn.rs index b75b0576949..8e38245ace4 100644 --- a/trust-quorum/src/established_conn.rs +++ b/trust-quorum/src/established_conn.rs @@ -155,14 +155,6 @@ impl EstablishedConn { } async fn close(&mut self) { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { - task_id: self.task_id, - msg: ConnToMainMsgInner::Disconnected { - peer_id: self.peer_id.clone(), - }, - }) { - warn!(self.log, "Failed to send to main task"); - } let _ = self.writer.shutdown().await; } @@ -198,7 +190,7 @@ impl EstablishedConn { debug!(self.log, "Received {msg:?}"); match msg { WireMsg::Tq(msg) => { - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::Received { from: self.peer_id.clone(), @@ -209,7 +201,7 @@ impl EstablishedConn { self.log, "Failed to send received fsm msg to main task" ); - panic!("Connection to main task channel full"); + panic!("Connection to main task channel error: {e:#?}"); } } WireMsg::Ping => { @@ -218,7 +210,7 @@ impl EstablishedConn { } WireMsg::NetworkConfig(config) => { let generation = config.generation; - if let Err(_) = self.main_tx.try_send(ConnToMainMsg { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { task_id: self.task_id, msg: ConnToMainMsgInner::ReceivedNetworkConfig { from: self.peer_id.clone(), @@ -230,7 +222,37 @@ impl EstablishedConn { "Failed to send received NetworkConfig with generation {generation} to main task" ); - panic!("Connection to main task channnel full"); + panic!("Connection to main task channel error: {e:#?}"); + } + } + WireMsg::ProxyRequest(req) => { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { + task_id: self.task_id, + msg: ConnToMainMsgInner::ProxyRequestReceived { + from: self.peer_id.clone(), + req, + }, + }) { + error!( + self.log, + "Failed to send received proxy request to the main task" + ); + panic!("Connection to main task channel error: {e:#?}"); + } + } + WireMsg::ProxyResponse(rsp) => { + if let Err(e) = self.main_tx.try_send(ConnToMainMsg { + task_id: self.task_id, + msg: ConnToMainMsgInner::ProxyResponseReceived { + from: self.peer_id.clone(), + rsp, + }, + }) { + error!( + self.log, + "Failed to send received proxy resposne to the main task" + ); + panic!("Connection to main task channel error: {e:#?}"); } } } diff --git a/trust-quorum/src/ledgers.rs b/trust-quorum/src/ledgers.rs index b8830141d59..1929e5799e5 100644 --- a/trust-quorum/src/ledgers.rs +++ b/trust-quorum/src/ledgers.rs @@ -17,7 +17,7 @@ use slog::{Logger, info}; use trust_quorum_protocol::PersistentState; /// A wrapper type around [`PersistentState`] for use as a [`Ledger`] -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentStateLedger { pub generation: u64, pub state: PersistentState, diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index dec9b3608ed..042944955a4 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -7,9 +7,12 @@ mod connection_manager; pub(crate) mod established_conn; mod ledgers; +mod proxy; mod task; +pub use proxy::Proxy; + pub(crate) use connection_manager::{ ConnToMainMsg, ConnToMainMsgInner, MainToConnMsg, WireMsg, }; -pub use task::NodeTask; +pub use task::{CommitStatus, Config, NodeApiError, NodeTask, NodeTaskHandle}; diff --git a/trust-quorum/src/proxy.rs b/trust-quorum/src/proxy.rs new file mode 100644 index 00000000000..c44bcd59b91 --- /dev/null +++ b/trust-quorum/src/proxy.rs @@ -0,0 +1,620 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A mechanism for proxying API requests from one trust quorum node to another +//! over sprockets. +//! +//! This is necessary in the case when there is no sled-agent with which Nexus +//! can directly communicate on the underlay network. Since Nexus is not on the +//! bootstrap network it cannot talk directly to trust quorum nodes. +//! +//! This proxy mechanism is also useful during RSS and for general debugging +//! purposes. + +use crate::{ + CommitStatus, + task::{NodeApiRequest, NodeStatus}, +}; +use debug_ignore::DebugIgnore; +use derive_more::From; +use iddqd::{IdHashItem, IdHashMap, id_upcast}; +use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; +use slog_error_chain::{InlineErrorChain, SlogInlineError}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task; +use trust_quorum_protocol::{ + BaseboardId, CommitError, Configuration, Epoch, PrepareAndCommitError, +}; +use uuid::Uuid; + +/// Requests that can be proxied to another node. Proxied requests should not be +/// proxied again once receieved. The receiving node should instead immediately +/// respond to the request to limit the time spent in the event loop, like a +/// normal NodeApiRequest. +#[derive(Debug, Serialize, Deserialize)] +pub struct WireRequest { + pub request_id: Uuid, + pub op: WireOp, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum WireOp { + Commit { rack_id: RackUuid, epoch: Epoch }, + PrepareAndCommit { config: Configuration }, + Status, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WireResponse { + pub request_id: Uuid, + pub result: Result, +} + +/// The successful variant of a [`WireResponse`] +#[derive(Debug, Serialize, Deserialize, From)] +pub enum WireValue { + /// The successful response value for a `commit` or `prepare_and_commit` + /// operation + Commit(CommitStatus), + /// The successful response value for a `status` operation + Status(NodeStatus), +} + +/// The error variant of a [`WireResponse`] +#[derive( + Debug, PartialEq, Eq, Serialize, Deserialize, From, thiserror::Error, +)] +pub enum WireError { + #[error(transparent)] + Commit(CommitError), + #[error(transparent)] + PrepareAndCommit(PrepareAndCommitError), + #[error(transparent)] + NoInnerError(NoInnerError), +} + +/// Define an Error for cases where the remote task doesn't return an error +/// +/// This is roughly analagous to `Infallible`, but derives `Error`, `Serialize`, +/// and `Deserialize`. +#[derive( + Debug, + Clone, + thiserror::Error, + PartialEq, + Eq, + SlogInlineError, + Serialize, + Deserialize, +)] +#[error("inner error when none expected")] +pub struct NoInnerError; + +/// A mechanism for proxying API requests to another node +pub struct Proxy { + // A mechanism to send a `WireRequest` to our local task for proxying. + tx: mpsc::Sender, +} + +impl Proxy { + pub fn new(tx: mpsc::Sender) -> Proxy { + Proxy { tx } + } + + pub async fn commit( + &self, + destination: BaseboardId, + rack_id: RackUuid, + epoch: Epoch, + ) -> Result> { + let op = WireOp::Commit { rack_id, epoch }; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Commit(cs) => Ok(cs), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::Commit(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + + self.send(destination, op, destructure_fn).await + } + + pub async fn prepare_and_commit( + &self, + destination: BaseboardId, + config: Configuration, + ) -> Result> { + let op = WireOp::PrepareAndCommit { config }; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Commit(cs) => Ok(cs), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::PrepareAndCommit(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + self.send(destination, op, destructure_fn).await + } + + pub async fn status( + &self, + destination: BaseboardId, + ) -> Result> { + let op = WireOp::Status; + let destructure_fn = move |res| match res { + Ok(val) => match val { + WireValue::Status(status) => Ok(status), + other => { + Err(ProxyError::InvalidResponse(format!("{other:#?}"))) + } + }, + Err(err) => match err { + WireError::NoInnerError(e) => Err(e.into()), + other => Err(ProxyError::InvalidResponse( + InlineErrorChain::new(&other).to_string(), + )), + }, + }; + self.send(destination, op, destructure_fn).await + } + + /// Send a `WireRequest` to a task and destructure its response to the + /// appropriate type + async fn send( + &self, + destination: BaseboardId, + op: WireOp, + f: F, + ) -> Result> + where + E: std::error::Error, + F: FnOnce(Result) -> Result>, + { + let request_id = Uuid::new_v4(); + let wire_request = WireRequest { request_id, op }; + + // A wrapper for responses from the task + let (task_tx, task_rx) = oneshot::channel(); + + // The message to send to the task + let api_request = + NodeApiRequest::Proxy { destination, wire_request, tx: task_tx }; + + if let Err(e) = self.tx.try_send(api_request) { + match e { + mpsc::error::TrySendError::Full(_) => { + return Err(ProxyError::Busy); + } + mpsc::error::TrySendError::Closed(_) => { + return Err(ProxyError::RecvError); + } + } + } + + let res = task_rx.await.map_err(|_| ProxyError::RecvError)?; + + let res = match res { + Err(TrackerError::Disconnected) => { + return Err(ProxyError::Disconnected); + } + Err(TrackerError::Wire(err)) => Err(err), + Ok(val) => Ok(val), + }; + + f(res) + } +} + +/// The error result of a proxy request +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, SlogInlineError)] +pub enum ProxyError { + #[error(transparent)] + Inner(#[from] T), + #[error("disconnected")] + Disconnected, + #[error("response for different type received: {0}")] + InvalidResponse(String), + #[error("task sender dropped")] + RecvError, + #[error("task is busy: channel full")] + Busy, +} + +/// An error returned from a [`Tracker`]. +/// +/// This error wraps a `WireError` so that it can also indicate when no response +/// was received, such as when the sprockets channel was disconnected. +pub enum TrackerError { + Wire(WireError), + Disconnected, +} + +/// A trackable in-flight proxy request, owned by the `Tracker` +#[derive(Debug)] +pub struct TrackableRequest { + /// Each `TrackableRequest` is bound to a + /// [`crate::established_conn::EstablishedConn`] that is uniquely identified + /// by its `tokio::task::Id`. This is useful because it disambiguates + /// connect and disconnect operations for the same `destination` such that + /// they don't have to be totally ordered. It is enough to know that a + /// disconnect for a given `task_id` only occurs after a connect. + task_id: task::Id, + /// A unique id for a given proxy request + request_id: Uuid, + // The option exists so we can take the sender out in `on_disconnect`, when + // the request is borrowed, but about to be discarded. + tx: DebugIgnore>>>, +} + +impl TrackableRequest { + pub fn new( + task_id: task::Id, + request_id: Uuid, + tx: oneshot::Sender>, + ) -> TrackableRequest { + TrackableRequest { task_id, request_id, tx: DebugIgnore(Some(tx)) } + } +} + +impl IdHashItem for TrackableRequest { + type Key<'a> = &'a Uuid; + + fn key(&self) -> Self::Key<'_> { + &self.request_id + } + + id_upcast!(); +} + +/// A mechanism to keep track of proxied requests and wait for their replies +pub struct Tracker { + // In flight operations + ops: IdHashMap, +} + +impl Tracker { + pub fn new() -> Tracker { + Tracker { ops: IdHashMap::new() } + } + + /// The number of proxied requests outstanding + pub fn len(&self) -> usize { + self.ops.len() + } + + pub fn insert(&mut self, req: TrackableRequest) { + self.ops.insert_unique(req).expect("no duplicate request IDs"); + } + + /// Handle a proxied response from a peer + pub fn on_response( + &mut self, + request_id: Uuid, + result: Result, + ) { + if let Some(mut req) = self.ops.remove(&request_id) { + let res = result.map_err(TrackerError::Wire); + let _ = req.tx.take().unwrap().send(res); + } + } + + /// A remote peer has disconnected + pub fn on_disconnect(&mut self, task_id: task::Id) { + self.ops.retain(|mut req| { + if req.task_id == task_id { + let tx = req.tx.take().unwrap(); + let _ = tx.send(Err(TrackerError::Disconnected)); + false + } else { + true + } + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + use std::time::Duration; + use tokio::spawn; + + /// Recv a message from the proxy and insert it into the tracker. + async fn recv_and_insert( + rx: &mut mpsc::Receiver, + tracker: &mut Tracker, + task_id: task::Id, + ) { + let Some(NodeApiRequest::Proxy { wire_request, tx, .. }) = + rx.recv().await + else { + panic!("Invalid NodeApiRequest") + }; + + let req = TrackableRequest::new(task_id, wire_request.request_id, tx); + tracker.insert(req); + } + + #[tokio::test] + async fn proxy_roundtrip_concurrent() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap(); + + // The first attempt should succeed + assert_eq!(s, CommitStatus::Committed); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker, task_id).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Get the first request_id. It's internal to the system, but we need to + // know it here to fake a response. + let request_id_1 = tracker.ops.iter().next().unwrap().request_id; + + // Now spawn a concurrent "user" task that proxies a `Status` request + // to the same node. + let proxy = Proxy::new(tx); + let count = requests_completed.clone(); + spawn(async move { + let s = proxy.status(destination.clone()).await.unwrap(); + assert_matches!(s, NodeStatus { .. }); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker, task_id).await; + assert_eq!(tracker.len(), 2); + + // We still haven't actually completed any operations yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + let request_id_2 = tracker + .ops + .iter() + .find(|&r| r.request_id != request_id_1) + .unwrap() + .request_id; + + // Now simulate completion of both requests, in reverse order. + tracker.on_response( + request_id_2, + Ok(WireValue::Status(NodeStatus::default())), + ); + tracker.on_response( + request_id_1, + Ok(WireValue::Commit(CommitStatus::Committed)), + ); + + // Now wait for both responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 2 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } + + #[tokio::test] + async fn proxy_roundtrip_invalid_response() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); + assert_matches!(s, ProxyError::InvalidResponse(_)); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker, task_id).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Get the request_id. It's internal to the system, but we need to know + // it here to fake a response. + let request_id = tracker.ops.iter().next().unwrap().request_id; + + // Now return a successful response, but of the wrong type. + tracker.on_response( + request_id, + Ok(WireValue::Status(NodeStatus::default())), + ); + + // Now wait for the error responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } + + #[tokio::test] + async fn proxy_roundtrip_disconnected() { + let destination = BaseboardId { + part_number: "test".to_string(), + serial_number: "test".to_string(), + }; + let rack_id = RackUuid::new_v4(); + + // In real code, the `tokio::task::ID` is the id of the + // `EstablishedConnectionTask`. However, we are simulating those + // connections here, so just use an ID of an arbitrary task. + let task_id = task::spawn(async {}).id(); + + // Test channel where the sender is usually cloned from the [`crate::NodeTaskHandle`], + // and the receiver is owned by the local [`crate::NodeTask`] + let (tx, mut rx) = mpsc::channel(5); + let proxy = Proxy::new(tx.clone()); + let mut tracker = Tracker::new(); + + // All spawned tasks will increment this value when processing a + // request. The result is polled with `wait_for_condition` at the end + // of the test, and therefore there is no reason to join on any of the + // spawned test tasks. + let requests_completed = Arc::new(AtomicUsize::new(0)); + + // This is the first "user" task that will issue proxy operations + let count = requests_completed.clone(); + let dest = destination.clone(); + spawn(async move { + let s = proxy.commit(dest, rack_id, Epoch(1)).await.unwrap_err(); + assert_eq!(s, ProxyError::Disconnected); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // No requests have been received yet + assert_eq!(tracker.len(), 0); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker, task_id).await; + + // We now have a request in the tracker + assert_eq!(tracker.len(), 1); + + // We haven't actually completed our operation yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Now spawn a concurrent "user" task that proxies a `Status` request + // to the same node. + let proxy = Proxy::new(tx); + let count = requests_completed.clone(); + let dest = destination.clone(); + spawn(async move { + let s = proxy.status(dest.clone()).await.unwrap_err(); + assert_eq!(s, ProxyError::Disconnected); + let _ = count.fetch_add(1, Ordering::Relaxed); + }); + + // Simulate receiving a request by the [`NodeTask`] + recv_and_insert(&mut rx, &mut tracker, task_id).await; + assert_eq!(tracker.len(), 2); + + // We still haven't actually completed any operations yet + assert_eq!(requests_completed.load(Ordering::Relaxed), 0); + + // Now simulate a disconnection to the proxy destination for this + // specific connection task. + tracker.on_disconnect(task_id); + + // Now wait for both responses to be processed + wait_for_condition( + async || { + if requests_completed.load(Ordering::Relaxed) == 2 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(10), + &Duration::from_secs(10), + ) + .await + .unwrap(); + + assert_eq!(tracker.len(), 0); + } +} diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index df3b34ab074..c47fd644339 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -7,12 +7,15 @@ use crate::connection_manager::{ ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, + DisconnectedPeer, ProxyConnState, }; use crate::ledgers::PersistentStateLedger; +use crate::proxy; use camino::Utf8PathBuf; use omicron_uuid_kinds::RackUuid; use serde::{Deserialize, Serialize}; use slog::{Logger, debug, error, info, o, warn}; +use slog_error_chain::SlogInlineError; use sprockets_tls::keys::SprocketsConfig; use std::collections::BTreeSet; use std::net::SocketAddrV6; @@ -32,6 +35,7 @@ use trust_quorum_protocol::{ use bootstore::schemes::v0::NetworkConfig; /// Whether or not a configuration has committed or is still underway. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum CommitStatus { Committed, Pending, @@ -62,27 +66,28 @@ pub struct Config { /// LRTQ upgrade. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CoordinatorStatus { - config: Configuration, - acked_prepares: BTreeSet, + pub config: Configuration, + pub acked_prepares: BTreeSet, } // Details about a given node's status -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NodeStatus { - connected_peers: BTreeSet, - alarms: BTreeSet, - persistent_state: NodePersistentStateSummary, + pub connected_peers: BTreeSet, + pub alarms: BTreeSet, + pub persistent_state: NodePersistentStateSummary, + pub proxied_requests: u64, } /// A summary of a node's persistent state, leaving out things like key shares /// and hashes. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NodePersistentStateSummary { - has_lrtq_share: bool, - configs: BTreeSet, - shares: BTreeSet, - commits: BTreeSet, - expunged: Option, + pub has_lrtq_share: bool, + pub configs: BTreeSet, + pub shares: BTreeSet, + pub commits: BTreeSet, + pub expunged: Option, } impl From<&PersistentState> for NodePersistentStateSummary { @@ -160,10 +165,26 @@ pub enum NodeApiRequest { /// Retrieve the current network config NetworkConfig { responder: oneshot::Sender> }, + + /// Proxy a [`proxy::WireRequest`] operation to another node + /// + /// When sled-agent is not running there is no direct way to issue + /// operations from Nexus. This occurs when when a node has not yet joined a + /// trust quorum configuration, but the mechanism is also useful during RSS. + /// In these cases, we need to take an existing node that we have access to + /// and proxy requests over sprockets to the `destination` node. + Proxy { + // Where to send the `wire_request` + destination: BaseboardId, + /// The actual request proxied across nodes + wire_request: proxy::WireRequest, + /// A mechanism for responding to the caller + tx: oneshot::Sender>, + }, } /// An error response from a `NodeApiRequest` -#[derive(Error, Debug, PartialEq)] +#[derive(Error, Debug, PartialEq, SlogInlineError)] pub enum NodeApiError { #[error("failed to send request to node task")] Send, @@ -221,6 +242,12 @@ impl NodeTaskHandle { &self.baseboard_id } + /// Return a [`proxy::Proxy`] that allows callers to proxy certain API requests + /// to other nodes. + pub fn proxy(&self) -> proxy::Proxy { + proxy::Proxy::new(self.tx.clone()) + } + /// Initiate a trust quorum reconfiguration at this node pub async fn reconfigure( &self, @@ -374,6 +401,9 @@ pub struct NodeTask { /// channels shared with trust quorum, but is not part of the trust quorum /// protocol. network_config: Option, + + /// A tracker for API requests proxied to other nodes + proxy_tracker: proxy::Tracker, } impl NodeTask { @@ -435,6 +465,7 @@ impl NodeTask { conn_mgr_rx, rx, network_config, + proxy_tracker: proxy::Tracker::new(), }, NodeTaskHandle { baseboard_id, tx, listen_addr }, ) @@ -452,9 +483,15 @@ impl NodeTask { self.on_api_request(request).await; } res = self.conn_mgr.step(corpus.clone()) => { - if let Err(err) = res { - error!(self.log, "Failed to accept connection"; &err); - continue; + match res { + Ok(Some(disconnected_peer)) => { + self.on_disconnect(disconnected_peer); + } + Ok(None) => {} + Err(err) => { + error!(self.log, "Failed to accept connection"; &err); + continue; + } } } Some(msg) = self.conn_mgr_rx.recv() => { @@ -468,6 +505,14 @@ impl NodeTask { } } + /// A task managing an established connenction to a peer has just exited + fn on_disconnect(&mut self, disconnected_peer: DisconnectedPeer) { + self.proxy_tracker.on_disconnect(disconnected_peer.task_id); + if let Some(peer_id) = disconnected_peer.peer_id { + self.node.on_disconnect(&mut self.ctx, peer_id); + } + } + // Handle messages from connection management tasks // // We persist state at the end of this method, which always occurs before @@ -490,10 +535,6 @@ impl NodeTask { self.send_network_config(&peer_id).await; self.node.on_connect(&mut self.ctx, peer_id); } - ConnToMainMsgInner::Disconnected { peer_id } => { - self.conn_mgr.on_disconnected(task_id, peer_id.clone()).await; - self.node.on_disconnect(&mut self.ctx, peer_id); - } ConnToMainMsgInner::Received { from, msg } => { self.node.handle(&mut self.ctx, from, msg); } @@ -522,10 +563,57 @@ impl NodeTask { self.broadcast_network_config(Some(&from)).await; } } + ConnToMainMsgInner::ProxyRequestReceived { from, req } => { + info!( + self.log, + "Received proxy request : {req:#?}"; + "peer_id" => %from + ); + self.handle_proxy_request(from, req).await; + } + ConnToMainMsgInner::ProxyResponseReceived { from, rsp } => { + info!( + self.log, + "Received proxy response: {rsp:#?}"; + "peer_id" => %from + ); + let proxy::WireResponse { request_id, result } = rsp; + self.proxy_tracker.on_response(request_id, result); + } } self.save_persistent_state().await; } + // Handle these requests exactly like we handle `NodeApiRequests` but then + // respond to the proxy node over the network rather than oneshot channel + // used by the API. + async fn handle_proxy_request( + &mut self, + from: BaseboardId, + req: proxy::WireRequest, + ) { + let proxy::WireRequest { request_id, op } = req; + match op { + proxy::WireOp::Commit { rack_id, epoch } => { + let res = self.commit(rack_id, epoch).await; + let result = res.map(Into::into).map_err(Into::into); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + proxy::WireOp::PrepareAndCommit { config } => { + let res = self.prepare_and_commit(config).await; + let result = res.map(Into::into).map_err(Into::into); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + proxy::WireOp::Status => { + let result = Ok(self.status().into()); + let rsp = proxy::WireResponse { request_id, result }; + self.conn_mgr.proxy_response(&from, rsp).await; + } + } + } + // Handle API requests from sled-agent // // NOTE: We persist state where necessary before responding to clients. Any @@ -541,26 +629,16 @@ impl NodeTask { .conn_mgr .update_bootstrap_connections(addrs, corpus) .await; - for peer_id in disconnected { - self.node.on_disconnect(&mut self.ctx, peer_id); + for handle in disconnected { + self.proxy_tracker.on_disconnect(handle.task_id()); + self.node.on_disconnect(&mut self.ctx, handle.baseboard_id); } } NodeApiRequest::ClearSecrets => { self.node.clear_secrets(); } NodeApiRequest::Commit { rack_id, epoch, tx } => { - let res = self - .node - .commit_configuration(&mut self.ctx, rack_id, epoch) - .map(|_| { - if self.ctx.persistent_state().commits.contains(&epoch) - { - CommitStatus::Committed - } else { - CommitStatus::Pending - } - }); - self.save_persistent_state().await; + let res = self.commit(rack_id, epoch).await; let _ = tx.send(res); } NodeApiRequest::ConnMgrStatus { tx } => { @@ -587,26 +665,10 @@ impl NodeTask { let _ = tx.send(res); } NodeApiRequest::NodeStatus { tx } => { - let _ = tx.send(NodeStatus { - connected_peers: self.ctx.connected().clone(), - alarms: self.ctx.alarms().clone(), - persistent_state: self.ctx.persistent_state().into(), - }); + let _ = tx.send(self.status()); } NodeApiRequest::PrepareAndCommit { config, tx } => { - let epoch = config.epoch; - let res = self - .node - .prepare_and_commit(&mut self.ctx, config) - .map(|_| { - if self.ctx.persistent_state().commits.contains(&epoch) - { - CommitStatus::Committed - } else { - CommitStatus::Pending - } - }); - self.save_persistent_state().await; + let res = self.prepare_and_commit(config).await; let _ = tx.send(res); } NodeApiRequest::Reconfigure { msg, tx } => { @@ -626,9 +688,79 @@ impl NodeTask { NodeApiRequest::NetworkConfig { responder } => { let _ = responder.send(self.network_config.clone()); } + NodeApiRequest::Proxy { destination, wire_request, tx } => { + let request_id = wire_request.request_id; + match self + .conn_mgr + .proxy_request(&destination, wire_request) + .await + { + ProxyConnState::Connected(task_id) => { + // Track the request. If the connection is disconnected + // before the response is received, then the caller will + // get notified about this. + let req = proxy::TrackableRequest::new( + task_id, request_id, tx, + ); + self.proxy_tracker.insert(req); + } + ProxyConnState::Disconnected => { + // Return the fact that the message was not sent immediately + let _ = tx.send(Err(proxy::TrackerError::Disconnected)); + } + } + } } } + /// Return the status of this [`NodeTask`] + fn status(&self) -> NodeStatus { + NodeStatus { + connected_peers: self.ctx.connected().clone(), + alarms: self.ctx.alarms().clone(), + persistent_state: self.ctx.persistent_state().into(), + proxied_requests: self.proxy_tracker.len() as u64, + } + } + + /// Commit a configuration synchronously if possible + async fn commit( + &mut self, + rack_id: RackUuid, + epoch: Epoch, + ) -> Result { + let res = self + .node + .commit_configuration(&mut self.ctx, rack_id, epoch) + .map(|_| { + if self.ctx.persistent_state().commits.contains(&epoch) { + CommitStatus::Committed + } else { + CommitStatus::Pending + } + }); + self.save_persistent_state().await; + res + } + + /// PrepareAndCommit a configuration synchronously if possible + async fn prepare_and_commit( + &mut self, + config: Configuration, + ) -> Result { + let epoch = config.epoch; + let res = + self.node.prepare_and_commit(&mut self.ctx, config).map(|_| { + if self.ctx.persistent_state().commits.contains(&epoch) { + CommitStatus::Committed + } else { + CommitStatus::Pending + } + }); + self.save_persistent_state().await; + res + } + /// Save `PersistentState` to storage if necessary async fn save_persistent_state(&mut self) { if self.ctx.persistent_state_change_check_and_reset() { @@ -747,6 +879,8 @@ mod tests { use crate::connection_manager::{ ConnState, RECONNECT_TIME, platform_id_to_baseboard_id, }; + use crate::proxy::ProxyError; + use assert_matches::assert_matches; use camino::Utf8PathBuf; use dropshot::test_util::{LogContext, log_prefix_for_test}; use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; @@ -1178,7 +1312,7 @@ mod tests { /// Commit an initial configuration at all nodes #[tokio::test] - pub async fn tq_initial_config() { + async fn tq_initial_config() { let num_nodes = 4; let setup = TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; @@ -1227,31 +1361,17 @@ mod tests { .unwrap(); // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed - ) { - acked += 1; - } - } - if acked == num_nodes { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes); // Now load the rack secret at all nodes setup @@ -1271,7 +1391,7 @@ mod tests { /// at the first 3 nodes. Then we go and issue a `PrepareAndCommit` to the last /// node and ensure it commits. #[tokio::test] - pub async fn tq_initial_config_prepare_and_commit() { + async fn tq_initial_config_prepare_and_commit() { let num_nodes = 4; let setup = TestSetup::spawn_nodes( "tq_initial_config_prepare_and_commit", @@ -1335,31 +1455,17 @@ mod tests { coordinator.coordinator_status().await.unwrap().unwrap().config; // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles[0..num_nodes - 1] { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed, - ) { - acked += 1; - } - } - if acked == num_nodes - 1 { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes - 1); // Now ensure that the last node still hasn't prepared or committed for // epoch 1, and isn't connected to any other node. @@ -1420,7 +1526,7 @@ mod tests { /// the configuration for the prior epoch. This should result in commit /// advancing to the latest epoch. #[tokio::test] - pub async fn tq_reconfig_with_commit_advance() { + async fn tq_reconfig_with_commit_advance() { let num_nodes = 4; let setup = TestSetup::spawn_nodes( "tq_recofnig_with_commit_advance", @@ -1472,31 +1578,17 @@ mod tests { .unwrap(); // Commit at each node - // - // Nexus retries this idempotent command until each node acks. So we - // simulate that here. - wait_for_condition( - async || { - let mut acked = 0; - for h in &setup.node_handles { - if matches!( - h.commit(rack_id, Epoch(1)).await.unwrap(), - CommitStatus::Committed - ) { - acked += 1; - } - } - if acked == num_nodes { - Ok(()) - } else { - Err(CondCheckError::<()>::NotYet) - } - }, - &poll_interval, - &poll_max, - ) - .await - .unwrap(); + // This should be immediate, since all nodes have acked prepares. + let mut acked = 0; + for h in &setup.node_handles { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes); // Now load the rack secret at all nodes setup @@ -1652,7 +1744,7 @@ mod tests { } #[tokio::test] - pub async fn tq_upgrade_from_lrtq() { + async fn tq_upgrade_from_lrtq() { let num_nodes = 4; let (setup, rack_id) = TestSetup::spawn_nodes_with_lrtq_shares( "tq_upgrade_from_lrtq", @@ -1739,7 +1831,7 @@ mod tests { /// Ensure state is persisted as we expect #[tokio::test] - pub async fn tq_persistent_state() { + async fn tq_persistent_state() { let num_nodes = 4; let mut setup = TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; @@ -2022,4 +2114,164 @@ mod tests { setup.cleanup_successful(); } + + /// Proxy API requests to other nodes + #[tokio::test] + async fn tq_proxy() { + let num_nodes = 4; + let mut setup = TestSetup::spawn_nodes("tq_proxy", num_nodes).await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Save the configuration as if we were nexus + let config = + coordinator.coordinator_status().await.unwrap().unwrap().config; + + // Commit at each node except the last one + // Commit should be immediate since all nodes have acked prepares + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { + acked += 1; + } + } + assert_eq!(acked, num_nodes - 1); + + // Proxy a commit through the first node to the last node + // It should commit immediately since it has prepared already. + let proxy = &setup.node_handles[0].proxy(); + let destination = setup.members().last().unwrap().clone(); + let status = proxy + .commit(destination.clone(), rack_id, Epoch(1)) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // Commit should be idempotent + let status = proxy + .commit(destination.clone(), rack_id, Epoch(1)) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // PrepareAndCommit should also be idempotent since the configuration is + // already committed + let status = proxy + .prepare_and_commit(destination.clone(), config.clone()) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Committed); + + // Try to commit a configuration that doesn't exist + let err = proxy + .commit(destination.clone(), rack_id, Epoch(2)) + .await + .expect_err("expected to fail proxy commit"); + assert_eq!( + err, + ProxyError::::Inner(CommitError::NotPrepared(Epoch( + 2 + ))) + ); + + // PrepareAndCommit should return pending, because it has to compute + // it's own keyshare for the new config, which will eventually fail. + // + // Nexus will never actually send a `PrepareAndCommit` when there hasn't + // been a commit. This is just here to check the behavior of the proxy + // code. + let mut config2 = config.clone(); + config2.epoch = Epoch(2); + let status = proxy + .prepare_and_commit(destination.clone(), config2) + .await + .expect("successful proxy op"); + assert_eq!(status, CommitStatus::Pending); + + // Let's get the status for a remote node + let status = proxy + .status(destination.clone()) + .await + .expect("successful status request"); + assert_matches!(status, NodeStatus { .. }); + + // Let's stop the last node and ensure we get an error + setup.simulate_crash_of_last_node().await; + let err = proxy + .status(destination.clone()) + .await + .expect_err("status request failed"); + assert_eq!(err, ProxyError::Disconnected); + + setup.simulate_restart_of_last_node().await; + + // Inform all nodes about the last node. Restarting changes the network + // address because we use ephemeral ports. + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + // Now load the rack secret at all nodes + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); + + // Now ensure we can get the status for the last node again. + let status = proxy + .status(destination.clone()) + .await + .expect("successful status request"); + assert_matches!(status, NodeStatus { .. }); + + setup.cleanup_successful(); + } }