From 36267b9eaa7b806aaeaa538fb109729def73551d Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 29 Oct 2025 23:00:15 +0000 Subject: [PATCH] TQ: Implement network config replication This code essentially re-implements the bootstore implementation of early network config replication over sprockets channels. The persistence of this state still lives in the bootstore, but will eventually move to the `trust-quorum` crate once all customer systems are running only trust quorum. This code does not deal with how the switchover from bootstore to to trust-quorum is made. That will come later with trust-quorum / sled-agent integration. --- trust-quorum/src/connection_manager.rs | 71 ++-- trust-quorum/src/task.rs | 436 ++++++++++++++++++++++--- 2 files changed, 445 insertions(+), 62 deletions(-) diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index bcc6bbe491..ff3f190d3d 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -94,26 +94,11 @@ pub struct ConnToMainMsg { #[derive(Debug)] pub enum ConnToMainMsgInner { - Accepted { - addr: SocketAddrV6, - peer_id: BaseboardId, - }, - Connected { - addr: SocketAddrV6, - peer_id: BaseboardId, - }, - Received { - from: BaseboardId, - msg: PeerMsg, - }, - #[expect(unused)] - ReceivedNetworkConfig { - from: BaseboardId, - config: NetworkConfig, - }, - Disconnected { - peer_id: BaseboardId, - }, + Accepted { addr: SocketAddrV6, peer_id: BaseboardId }, + Connected { addr: SocketAddrV6, peer_id: BaseboardId }, + Received { from: BaseboardId, msg: PeerMsg }, + ReceivedNetworkConfig { from: BaseboardId, config: NetworkConfig }, + Disconnected { peer_id: BaseboardId }, } pub struct TaskHandle { @@ -138,6 +123,13 @@ impl TaskHandle { pub async fn send(&self, msg: PeerMsg) { let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await; } + + pub async fn send_network_config(&self, config: NetworkConfig) { + let _ = self + .tx + .send(MainToConnMsg::Msg(WireMsg::NetworkConfig(config))) + .await; + } } impl BiHashItem for TaskHandle { @@ -388,6 +380,45 @@ impl ConnMgr { } } + // After we have updated our network config, we should send it out to all + // peers with established connections, with the exception of the peer we + // received it from if this was not a local update. + pub async fn broadcast_network_config( + &mut self, + network_config: &NetworkConfig, + excluded_peer: Option<&BaseboardId>, + ) { + for h in self + .established + .iter() + .filter(|&h| Some(&h.baseboard_id) != excluded_peer) + { + info!( + self.log, + "Sending network config"; + "peer_id" => %h.baseboard_id, + "generation" => network_config.generation + ); + h.task_handle.send_network_config(network_config.clone()).await; + } + } + + pub async fn send_network_config( + &mut self, + peer_id: &BaseboardId, + network_config: &NetworkConfig, + ) { + if let Some(h) = self.established.get1(peer_id) { + info!( + self.log, + "Sending network config"; + "peer_id" => %h.baseboard_id, + "generation" => network_config.generation + ); + h.task_handle.send_network_config(network_config.clone()).await; + } + } + /// Perform any polling related operations that the connection /// manager must perform concurrently. pub async fn step( diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 07f13f8534..a94a3214a8 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -12,7 +12,7 @@ use crate::ledgers::PersistentStateLedger; use camino::Utf8PathBuf; use omicron_uuid_kinds::RackUuid; use serde::{Deserialize, Serialize}; -use slog::{Logger, debug, error, info, o}; +use slog::{Logger, debug, error, info, o, warn}; use sprockets_tls::keys::SprocketsConfig; use std::collections::BTreeSet; use std::net::SocketAddrV6; @@ -29,6 +29,10 @@ use trust_quorum_protocol::{ ReconfigurationError, ReconfigureMsg, ReconstructedRackSecret, }; +// TODO: Move to this crate +// https://github.com/oxidecomputer/omicron/issues/9311 +use bootstore::schemes::v0::NetworkConfig; + #[cfg(not(test))] const LOAD_RACK_SECRET_RETRY_TIMEOUT: Duration = Duration::from_millis(500); #[cfg(test)] @@ -148,6 +152,15 @@ pub enum NodeApiRequest { /// Shutdown the node's tokio tasks Shutdown, + + /// Update Network Config used to bring up the control plane + UpdateNetworkConfig { + config: NetworkConfig, + responder: oneshot::Sender>, + }, + + /// Retrieve the current network config + NetworkConfig { responder: oneshot::Sender> }, } /// An error response from a `NodeApiRequest` @@ -167,6 +180,15 @@ pub enum NodeApiError { PrepareAndCommit(#[from] PrepareAndCommitError), #[error("failed to commit")] Commit(#[from] CommitError), + #[error( + "Network config update failed because it is out of date. Attempted \ + update generation: {attempted_update_generation}, current generation: \ + {current_generation}" + )] + StaleNetworkConfig { + attempted_update_generation: u64, + current_generation: u64, + }, } impl From> for NodeApiError { @@ -337,6 +359,32 @@ impl NodeTaskHandle { self.tx.send(NodeApiRequest::Shutdown).await?; Ok(()) } + + /// Update network config needed for bringing up the control plane + pub async fn update_network_config( + &self, + config: NetworkConfig, + ) -> Result<(), NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::UpdateNetworkConfig { config, responder: tx }) + .await + .map_err(|_| NodeApiError::Send)?; + rx.await? + } + + /// Retrieve the current network config + pub async fn network_config( + &self, + ) -> Result, NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::NetworkConfig { responder: tx }) + .await + .map_err(|_| NodeApiError::Send)?; + let res = rx.await?; + Ok(res) + } } pub struct NodeTask { @@ -351,6 +399,11 @@ pub struct NodeTask { // Handle requests received from `PeerHandle` rx: mpsc::Receiver, + + /// Network config needed for early boot. This is gossiped around on network + /// channels shared with trust quorum, but is not part of the trust quorum + /// protocol. + network_config: Option, } impl NodeTask { @@ -384,6 +437,13 @@ impl NodeTask { } else { (NodeCtx::new(config.baseboard_id.clone()), 0) }; + + let network_config = NetworkConfig::load( + &log, + config.network_config_ledger_paths.clone(), + ) + .await; + let node = Node::new(&log, &mut ctx); let conn_mgr = ConnMgr::new( &log, @@ -404,6 +464,7 @@ impl NodeTask { conn_mgr, conn_mgr_rx, rx, + network_config, }, NodeTaskHandle { baseboard_id, tx, listen_addr }, ) @@ -449,12 +510,14 @@ impl NodeTask { self.conn_mgr .server_handshake_completed(task_id, addr, peer_id.clone()) .await; + self.send_network_config(&peer_id).await; self.node.on_connect(&mut self.ctx, peer_id); } ConnToMainMsgInner::Connected { addr, peer_id } => { self.conn_mgr .client_handshake_completed(task_id, addr, peer_id.clone()) .await; + self.send_network_config(&peer_id).await; self.node.on_connect(&mut self.ctx, peer_id); } ConnToMainMsgInner::Disconnected { peer_id } => { @@ -464,11 +527,30 @@ impl NodeTask { ConnToMainMsgInner::Received { from, msg } => { self.node.handle(&mut self.ctx, from, msg); } - ConnToMainMsgInner::ReceivedNetworkConfig { - from: _, - config: _, - } => { - todo!(); + ConnToMainMsgInner::ReceivedNetworkConfig { from, config } => { + let current_gen = + self.network_config.as_ref().map_or(0, |c| c.generation); + let generation = config.generation; + info!( + self.log, + concat!( + "Received network config from {} with ", + "generation: {}, current generation: {}" + ), + from, + generation, + current_gen + ); + if generation > current_gen { + self.network_config = Some(config.clone()); + NetworkConfig::save( + &self.log, + self.config.network_config_ledger_paths.clone(), + config, + ) + .await; + self.broadcast_network_config(Some(&from)).await; + } } } self.save_persistent_state().await; @@ -557,11 +639,18 @@ impl NodeTask { info!(self.log, "Shutting down Node tokio tasks"); self.shutdown = true; } + NodeApiRequest::UpdateNetworkConfig { config, responder } => { + let res = self.update_network_config(config).await; + let _ = responder.send(res); + } + NodeApiRequest::NetworkConfig { responder } => { + let _ = responder.send(self.network_config.clone()); + } } } /// Save `PersistentState` to storage if necessary - pub async fn save_persistent_state(&mut self) { + async fn save_persistent_state(&mut self) { if self.ctx.persistent_state_change_check_and_reset() { self.tq_ledger_generation = PersistentStateLedger::save( &self.log, @@ -572,6 +661,104 @@ impl NodeTask { .await; } } + + async fn update_network_config( + &mut self, + config: NetworkConfig, + ) -> Result<(), NodeApiError> { + let current_gen = + self.network_config.as_ref().map_or(0, |c| c.generation); + info!( + self.log, + concat!( + "Attempting to update network config with ", + "generation: {}, current_generation: {}" + ), + config.generation, + current_gen, + ); + if current_gen > config.generation { + error!( + self.log, + concat!( + "Attempted network config update with ", + "stale generation: attemped_update_generation: {}, ", + "current_generation: {}" + ), + config.generation, + current_gen, + ); + Err(NodeApiError::StaleNetworkConfig { + attempted_update_generation: config.generation, + current_generation: current_gen, + }) + } else if current_gen == config.generation { + warn!( + self.log, + concat!( + "Not updating network config: generation ", + "{} is current" + ), + current_gen + ); + // We currently return an error here, because RSS is the + // only entity that triggers this code path and we want + // the error to show up in wicket. This indicates that the + // `clean-slate` script didn't properly clear out the + // `/pool/int/*/cluster` directories residing on the M.2 + // devices before RSS was run. The fix is to re-run clean- + // slate, ensure the cluster directories are empty and then + // re-run RSS. + // + // Eventually, however, we may want to not return an error + // on an idempotent update from Nexus via RPW, but we'll + // cross that bridge when we have that code written. + Err(NodeApiError::StaleNetworkConfig { + attempted_update_generation: config.generation, + current_generation: current_gen, + }) + } else { + self.network_config = Some(config.clone()); + NetworkConfig::save( + &self.log, + self.config.network_config_ledger_paths.clone(), + config, + ) + .await; + // Broadcast the updated config. We only broadcast when we + // successfully update it so we don't trigger an endless broadcast + // storm. + self.broadcast_network_config(None).await; + Ok(()) + } + } + + // After we have updated our network config, we should send it out to all + // peers, with the exception of the peer we received it from if this was not + // a local update. + async fn broadcast_network_config( + &mut self, + excluded_peer: Option<&BaseboardId>, + ) { + // We only call this method when there has been an update. Otherwise we + // have an invariant violation due to programmer error and should panic. + let network_config = self.network_config.as_ref().unwrap(); + info!( + self.log, + "Broadcasting network config with generation {}", + network_config.generation + ); + self.conn_mgr + .broadcast_network_config(network_config, excluded_peer) + .await; + } + + /// We always send our current network config on a peer connection + async fn send_network_config(&mut self, peer_id: &BaseboardId) { + if let Some(network_config) = self.network_config.as_ref() { + self.conn_mgr.send_network_config(peer_id, network_config).await + } + } } #[cfg(test)] @@ -630,12 +817,14 @@ mod tests { }; let tq_ledger_paths = vec![dir.join(format!("test-tq-ledger-[{i}]"))]; + let network_config_ledger_paths = + vec![dir.join(format!("test-network-config-ledger-[{i}]"))]; Config { baseboard_id, listen_addr, sprockets, tq_ledger_paths, - network_config_ledger_paths: vec![], + network_config_ledger_paths, } }) .collect() @@ -782,6 +971,32 @@ mod tests { ) } + pub async fn simulate_crash_of_last_node(&mut self) { + let join_handle = self.join_handles.pop().unwrap(); + let node_handle = self.node_handles.pop().unwrap(); + node_handle.shutdown().await.unwrap(); + join_handle.await.unwrap(); + let _ = self.listen_addrs.pop().unwrap(); + } + + pub async fn simulate_restart_of_last_node(&mut self) { + let (mut task, handle) = NodeTask::new( + self.configs.last().unwrap().clone(), + &self.logctx.log, + ) + .await; + let listen_addr = handle.listen_addr(); + self.node_handles.push(handle); + self.join_handles + .push(tokio::spawn(async move { task.run().await })); + self.listen_addrs.push(listen_addr); + } + + pub async fn simulate_crash_and_restart_of_last_node(&mut self) { + self.simulate_crash_of_last_node().await; + self.simulate_restart_of_last_node().await; + } + pub fn members(&self) -> impl Iterator { self.configs.iter().map(|c| &c.baseboard_id) } @@ -1535,25 +1750,7 @@ mod tests { .await .unwrap(); - // Simulate a crash of the last node. - let join_handle = setup.join_handles.pop().unwrap(); - let node_handle = setup.node_handles.pop().unwrap(); - node_handle.shutdown().await.unwrap(); - join_handle.await.unwrap(); - let _ = setup.listen_addrs.pop().unwrap(); - - // Now Bring it back up with the same persistent state, which contains - // the initial config and prepare. Commit should work and everything - // should pick up as expected. - let (mut task, handle) = NodeTask::new( - setup.configs.last().unwrap().clone(), - &setup.logctx.log, - ) - .await; - let listen_addr = handle.listen_addr(); - setup.node_handles.push(handle); - setup.join_handles.push(tokio::spawn(async move { task.run().await })); - setup.listen_addrs.push(listen_addr); + setup.simulate_crash_and_restart_of_last_node().await; // Tell nodes how to reach each other for h in &setup.node_handles { @@ -1596,21 +1793,7 @@ mod tests { assert_eq!(&rs, secret.as_ref().unwrap()); } - // Simulate crash and restart again - let join_handle = setup.join_handles.pop().unwrap(); - let node_handle = setup.node_handles.pop().unwrap(); - node_handle.shutdown().await.unwrap(); - join_handle.await.unwrap(); - let _ = setup.listen_addrs.pop().unwrap(); - let (mut task, handle) = NodeTask::new( - setup.configs.last().unwrap().clone(), - &setup.logctx.log, - ) - .await; - let listen_addr = handle.listen_addr(); - setup.node_handles.push(handle); - setup.join_handles.push(tokio::spawn(async move { task.run().await })); - setup.listen_addrs.push(listen_addr); + setup.simulate_crash_and_restart_of_last_node().await; // Tell nodes how to reach each other for h in &setup.node_handles { @@ -1631,4 +1814,173 @@ mod tests { setup.cleanup_successful(); } + + #[tokio::test] + async fn test_network_config() { + let num_nodes = 4; + let mut setup = + TestSetup::spawn_nodes("test_network_config", num_nodes).await; + + // Tell all but the last node how to reach each other + for h in setup.node_handles.iter().take(num_nodes - 1) { + h.load_peer_addresses( + setup + .listen_addrs + .iter() + .take(num_nodes - 1) + .cloned() + .collect(), + ) + .await + .unwrap(); + } + + // Ensure there is no network config at any of the nodes + for node in setup.node_handles.iter() { + assert_eq!(None, node.network_config().await.unwrap()); + } + + // Update the network config at node0 and ensure it has taken effect + let network_config = NetworkConfig { + generation: 1, + blob: b"Some network data".to_vec(), + }; + setup.node_handles[0] + .update_network_config(network_config.clone()) + .await + .unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for all nodes except one to learn the network config + wait_for_condition( + async || { + let mut count = 0; + for h in &setup.node_handles { + if let Ok(Some(c)) = h.network_config().await { + if c == network_config { + count += 1; + } + } + } + + if count == num_nodes - 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Inform all nodes about the last node + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Wait for all nodes to learn the network config + wait_for_condition( + async || { + let mut count = 0; + for h in &setup.node_handles { + if let Ok(Some(c)) = h.network_config().await { + if c == network_config { + count += 1; + } + } + } + + if count == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + setup.simulate_crash_and_restart_of_last_node().await; + + // Ensure after restarting that the node has the same network config + assert_eq!( + Some(&network_config), + setup + .node_handles + .last() + .unwrap() + .network_config() + .await + .unwrap() + .as_ref() + ); + + // Now take down the last node and update the network config + setup.simulate_crash_of_last_node().await; + let new_config = NetworkConfig { + generation: 2, + blob: b"Some more network data".to_vec(), + }; + setup.node_handles[0] + .update_network_config(new_config.clone()) + .await + .unwrap(); + + setup.simulate_restart_of_last_node().await; + + // Inform all nodes about the last node. Restarting changes the network + // address because we use ephemeral ports. + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Wait for all nodes to see the latest config + wait_for_condition( + async || { + let mut count = 0; + for h in &setup.node_handles { + if let Ok(Some(c)) = h.network_config().await { + if c == new_config { + count += 1; + } + } + } + + if count == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Try to update with the old config and watch it fail + // Try to update node0 with an old config, and watch it fail + let expected = Err(NodeApiError::StaleNetworkConfig { + attempted_update_generation: 1, + current_generation: 2, + }); + assert_eq!( + setup.node_handles[0] + .update_network_config(network_config.clone()) + .await, + expected + ); + + setup.cleanup_successful(); + } }