diff --git a/Cargo.lock b/Cargo.lock index 31242ad3863..e770d25bce8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3167,6 +3167,7 @@ dependencies = [ "chrono", "clap 3.2.6", "crucible-agent-client", + "ddm-admin-client", "dropshot", "expectorate", "futures", diff --git a/ddm-admin-client/src/lib.rs b/ddm-admin-client/src/lib.rs index a3482c40c52..c0714d0e86d 100644 --- a/ddm-admin-client/src/lib.rs +++ b/ddm-admin-client/src/lib.rs @@ -1,3 +1,5 @@ #![allow(clippy::redundant_closure_call, clippy::needless_lifetimes)] include!(concat!(env!("OUT_DIR"), "/ddm-admin-client.rs")); + +impl Copy for types::Ipv6Prefix {} diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index dc254c75e15..2e16d08d4ae 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -15,6 +15,7 @@ chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.2", features = ["derive"] } # Only used by the simulated sled agent. crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "8314eeddd228ec0d76cefa40c4a41d3e2611ac18" } +ddm-admin-client = { path = "../ddm-admin-client" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } futures = "0.3.21" ipnetwork = "0.18" diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index 507d92baf91..0aa9ba42f6d 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -6,7 +6,7 @@ use super::client::Client as BootstrapAgentClient; use super::config::{Config, BOOTSTRAP_AGENT_PORT}; -use super::discovery; +use super::ddm_admin_client::{DdmAdminClient, DdmError}; use super::params::SledAgentRequest; use super::rss_handle::RssHandle; use super::server::TrustQuorumMembership; @@ -17,18 +17,26 @@ use crate::illumos::dladm::{self, Dladm, PhysicalLink}; use crate::illumos::zone::Zones; use crate::server::Server as SledServer; use crate::sp::SpHandle; +use ddm_admin_client::types::Ipv6Prefix; use omicron_common::address::get_sled_address; use omicron_common::api::external::{Error as ExternalError, MacAddr}; use omicron_common::backoff::{ internal_service_policy, retry_notify, BackoffError, }; use slog::Logger; +use std::collections::HashSet; use std::net::{Ipv6Addr, SocketAddrV6}; use std::path::{Path, PathBuf}; use std::sync::Arc; use thiserror::Error; use tokio::sync::Mutex; +/// Initial octet of IPv6 for bootstrap addresses. +pub(crate) const BOOTSTRAP_PREFIX: u16 = 0xfdb0; + +/// IPv6 prefix mask for bootstrap addresses. +pub(crate) const BOOTSTRAP_MASK: u8 = 64; + /// Describes errors which may occur while operating the bootstrap service. #[derive(Error, Debug)] pub enum BootstrapError { @@ -39,6 +47,9 @@ pub enum BootstrapError { err: std::io::Error, }, + #[error("Error contacting ddmd: {0}")] + DdmError(#[from] DdmError), + #[error("Error starting sled agent: {0}")] SledError(String), @@ -48,6 +59,9 @@ pub enum BootstrapError { #[error(transparent)] TrustQuorum(#[from] TrustQuorumError), + #[error("Error collecting peer addresses: {0}")] + PeerAddresses(String), + #[error("Failed to initialize bootstrap address: {err}")] BootstrapAddress { err: crate::illumos::zone::EnsureGzAddressError }, } @@ -65,7 +79,7 @@ pub(crate) struct Agent { /// Store the parent log - without "component = BootstrapAgent" - so /// other launched components can set their own value. parent_log: Logger, - peer_monitor: discovery::PeerMonitor, + address: Ipv6Addr, /// Our share of the rack secret, if we have one. share: Mutex>, @@ -86,7 +100,7 @@ fn mac_to_socket_addr(mac: MacAddr) -> SocketAddrV6 { assert_eq!(6, mac_bytes.len()); let address = Ipv6Addr::new( - 0xfdb0, + BOOTSTRAP_PREFIX, ((mac_bytes[0] as u16) << 8) | mac_bytes[1] as u16, ((mac_bytes[2] as u16) << 8) | mac_bytes[3] as u16, ((mac_bytes[4] as u16) << 8) | mac_bytes[5] as u16, @@ -158,16 +172,17 @@ impl Agent { ) .map_err(|err| BootstrapError::BootstrapAddress { err })?; - let peer_monitor = discovery::PeerMonitor::new(&ba_log, address) - .map_err(|err| BootstrapError::Io { - message: format!("Monitoring for peers from {address}"), - err, - })?; + // Start trying to notify ddmd of our bootstrap address so it can + // advertise it to other sleds. + tokio::spawn(advertise_bootstrap_address_via_ddmd( + ba_log.clone(), + address, + )); let agent = Agent { log: ba_log, parent_log: log, - peer_monitor, + address, share: Mutex::new(None), rss: Mutex::new(None), sled_agent: Mutex::new(None), @@ -284,10 +299,33 @@ impl Agent { &self, share: ShareDistribution, ) -> Result { + let ddm_admin_client = DdmAdminClient::new(self.log.clone())?; let rack_secret = retry_notify( internal_service_policy(), || async { - let other_agents = self.peer_monitor.peer_addrs().await; + let other_agents = { + // Manually build up a `HashSet` instead of `.collect()`ing + // so we can log if we see any duplicates. + let mut addrs = HashSet::new(); + for addr in ddm_admin_client + .peer_addrs() + .await + .map_err(BootstrapError::DdmError) + .map_err(|err| BackoffError::transient(err))? + { + // We should never see duplicates; that would mean + // maghemite thinks two different sleds have the same + // bootstrap address! + if !addrs.insert(addr) { + let msg = format!("Duplicate peer addresses received from ddmd: {addr}"); + error!(&self.log, "{}", msg); + return Err(BackoffError::permanent( + BootstrapError::PeerAddresses(msg), + )); + } + } + addrs + }; info!( &self.log, "Bootstrap: Communicating with peers: {:?}", other_agents @@ -300,7 +338,9 @@ impl Agent { "Not enough peers to start establishing quorum" ); return Err(BackoffError::transient( - TrustQuorumError::NotEnoughPeers, + BootstrapError::TrustQuorum( + TrustQuorumError::NotEnoughPeers, + ), )); } info!( @@ -329,14 +369,17 @@ impl Agent { }) .collect(); - // TODO: Parallelize this and keep track of whose shares we've already retrieved and - // don't resend. See https://github.com/oxidecomputer/omicron/issues/514 + // TODO: Parallelize this and keep track of whose shares we've + // already retrieved and don't resend. See + // https://github.com/oxidecomputer/omicron/issues/514 let mut shares = vec![share.share.clone()]; for agent in &other_agents { let share = agent.request_share().await .map_err(|e| { info!(&self.log, "Bootstrap: failed to retreive share from peer: {:?}", e); - BackoffError::transient(e.into()) + BackoffError::transient( + BootstrapError::TrustQuorum(e.into()), + ) })?; info!( &self.log, @@ -360,7 +403,9 @@ impl Agent { // the error returned from `RackSecret::combine_shares`. // See https://github.com/oxidecomputer/omicron/issues/516 BackoffError::transient( - TrustQuorumError::RackSecretConstructionFailed(e), + BootstrapError::TrustQuorum( + TrustQuorumError::RackSecretConstructionFailed(e), + ), ) })?; info!(self.log, "RackSecret computed from shares."); @@ -386,7 +431,7 @@ impl Agent { let rss = RssHandle::start_rss( &self.parent_log, rss_config.clone(), - self.peer_monitor.observer().await, + self.address, self.sp.clone(), // TODO-cleanup: Remove this arg once RSS can discover the trust // quorum members over the management network. @@ -424,6 +469,21 @@ impl Agent { } } +async fn advertise_bootstrap_address_via_ddmd(log: Logger, address: Ipv6Addr) { + let prefix = Ipv6Prefix { addr: address, mask: 64 }; + retry_notify(internal_service_policy(), || async { + let client = DdmAdminClient::new(log.clone())?; + client.advertise_prefix(prefix).await?; + Ok(()) + }, |err, duration| { + info!( + log, + "Failed to notify ddmd of our address (will retry after {duration:?}"; + "err" => %err, + ); + }).await.unwrap(); +} + #[cfg(test)] mod tests { use super::*; diff --git a/sled-agent/src/bootstrap/ddm_admin_client.rs b/sled-agent/src/bootstrap/ddm_admin_client.rs new file mode 100644 index 00000000000..16729543aa2 --- /dev/null +++ b/sled-agent/src/bootstrap/ddm_admin_client.rs @@ -0,0 +1,101 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Client to ddmd (the maghemite service running on localhost). + +use ddm_admin_client::types::Ipv6Prefix; +use ddm_admin_client::Client; +use slog::Logger; +use std::net::Ipv6Addr; +use std::net::SocketAddr; +use std::net::SocketAddrV6; +use thiserror::Error; + +use crate::bootstrap::agent::BOOTSTRAP_MASK; +use crate::bootstrap::agent::BOOTSTRAP_PREFIX; + +// TODO-cleanup Is it okay to hardcode this port number and assume ddmd is bound +// to `::1`, or should we move that into our config? +const DDMD_PORT: u16 = 8000; + +#[derive(Debug, Error)] +pub enum DdmError { + #[error("Failed to construct an HTTP client: {0}")] + HttpClient(#[from] reqwest::Error), + + #[error("Failed making HTTP request to ddmd: {0}")] + DdmdApi(#[from] ddm_admin_client::Error), +} + +/// Manages Sled Discovery - both our announcement to other Sleds, +/// as well as our discovery of those sleds. +#[derive(Clone)] +pub struct DdmAdminClient { + client: Client, + log: Logger, +} + +impl DdmAdminClient { + /// Creates a new [`PeerMonitor`]. + pub fn new(log: Logger) -> Result { + let dur = std::time::Duration::from_secs(60); + let ddmd_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, DDMD_PORT, 0, 0); + + let client = reqwest::ClientBuilder::new() + .connect_timeout(dur) + .timeout(dur) + .build()?; + let client = Client::new_with_client( + &format!("http://{ddmd_addr}"), + client, + log.new(o!("DdmAdminClient" => SocketAddr::V6(ddmd_addr))), + ); + Ok(DdmAdminClient { client, log }) + } + + /// Instruct ddmd to advertise the given prefix to peer sleds. + pub async fn advertise_prefix( + &self, + prefix: Ipv6Prefix, + ) -> Result<(), DdmError> { + // TODO-cleanup Why does the generated openapi client require a `&Vec` + // instead of a `&[]`? + info!( + self.log, "Sending prefix to ddmd for advertisement"; + "prefix" => ?prefix, + ); + let prefixes = vec![prefix]; + self.client.advertise_prefixes(&prefixes).await?; + Ok(()) + } + + /// Returns the addresses of connected sleds. + /// + /// Note: These sleds have not yet been verified. + pub async fn peer_addrs( + &self, + ) -> Result + '_, DdmError> { + let prefixes = self.client.get_prefixes().await?.into_inner(); + info!(self.log, "Received prefixes from ddmd"; "prefixes" => ?prefixes); + Ok(prefixes.into_iter().filter_map(|(_, prefixes)| { + // If we receive multiple bootstrap prefixes from one peer, trim it + // down to just one. Connections on the bootstrap network are always + // authenticated via sprockets, which only needs one address. + prefixes.into_iter().find_map(|prefix| { + let mut segments = prefix.addr.segments(); + if prefix.mask == BOOTSTRAP_MASK + && segments[0] == BOOTSTRAP_PREFIX + { + // Bootstrap agent IPs always end in ::1; convert the + // `BOOTSTRAP_PREFIX::*/BOOTSTRAP_PREFIX` address we + // received into that specific address. + segments[7] = 1; + Some(Ipv6Addr::from(segments)) + } else { + None + } + }) + })) + } +} diff --git a/sled-agent/src/bootstrap/discovery.rs b/sled-agent/src/bootstrap/discovery.rs deleted file mode 100644 index 384b8fd027e..00000000000 --- a/sled-agent/src/bootstrap/discovery.rs +++ /dev/null @@ -1,169 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Sled announcement and discovery. - -use super::multicast; -use slog::Logger; -use std::collections::HashSet; -use std::io; -use std::net::{Ipv6Addr, SocketAddr}; -use std::sync::Arc; -use tokio::net::UdpSocket; -use tokio::sync::{broadcast, Mutex}; -use tokio::task::JoinHandle; - -// NOTE: This is larger than the expected number of sleds per rack, as -// peers may change as new sleds are swapped in for old ones. -// -// See the "TODO" below about removal of sleds from the HashSet -const PEER_CAPACITY_MAXIMUM: usize = 128; - -/// Manages Sled Discovery - both our announcement to other Sleds, -/// as well as our discovery of those sleds. -pub struct PeerMonitor { - // TODO: When can we remove sleds from this HashSet? Presumably, if a sled - // has been detached from the bootstrap network, we should drop it. - // - // Without such removal, the set size will be unbounded (though admittedly, - // growing slowly). - // - // Options: - // - Have some sort of expiration mechanism? This could turn the set of - // sleds here into "the sleds which we know were connected within the past - // hour", for example. - // - Have some other interface to identify the detachment of a peer. - our_address: Ipv6Addr, - sleds: Arc>>, - notification_sender: broadcast::Sender, - _worker: JoinHandle<()>, -} - -async fn monitor_worker( - log: Logger, - sender: UdpSocket, - listener: UdpSocket, - sleds: Arc>>, - notification_sender: broadcast::Sender, -) { - // Let this message be a reminder that this content is *not* - // encrypted, authenticated, or otherwise verified. We're just using - // it as a starting point for swapping addresses. - let message = - b"We've been trying to reach you about your car's extended warranty"; - loop { - let mut buf = vec![0u8; 128]; - tokio::select! { - _ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => { - if let Err(e) = sender.try_send_to(message, SocketAddr::V6(multicast::multicast_address())) { - warn!(log, "PeerMonitor failed to broadcast: {}", e); - } - } - result = listener.recv_from(&mut buf) => { - match result { - Ok((_, addr)) => { - match addr { - SocketAddr::V6(addr) => { - let mut sleds = sleds.lock().await; - if sleds.insert(*addr.ip()) { - info!(log, "Bootstrap Peer Monitor: Successfully received an address: {}", addr); - // We don't actually care if no one is listening, so - // drop the error if that's the case. - let _ = notification_sender.send(*addr.ip()); - } - } - _ => continue, - } - }, - Err(e) => warn!(log, "PeerMonitor failed to receive: {}", e), - } - } - } - } -} - -impl PeerMonitor { - /// Creates a new [`PeerMonitor`]. - pub fn new(log: &Logger, address: Ipv6Addr) -> Result { - let loopback = false; - let interface = 0; - let (sender, listener) = - multicast::new_ipv6_udp_pair(&address, loopback, interface)?; - - let sleds = Arc::new(Mutex::new(HashSet::new())); - let sleds_for_worker = sleds.clone(); - let log = log.clone(); - - let (tx, _) = tokio::sync::broadcast::channel(PEER_CAPACITY_MAXIMUM); - - let notification_sender = tx.clone(); - let worker = tokio::task::spawn(async move { - monitor_worker( - log, - sender, - listener, - sleds_for_worker, - notification_sender, - ) - .await - }); - - Ok(PeerMonitor { - our_address: address, - sleds, - notification_sender: tx, - _worker: worker, - }) - } - - /// Returns the addresses of connected sleds. - /// - /// For an interface that allows monitoring the connected sleds, rather - /// than just sampling at a single point-in-time, consider using - /// [`Self::observer`]. - /// - /// Note: These sleds have not yet been verified. - pub async fn peer_addrs(&self) -> Vec { - self.sleds.lock().await.iter().map(|addr| *addr).collect() - } - - /// Returns a [`PeerMonitorObserver`] which can be used to view the results - /// of monitoring for peers. - pub async fn observer(&self) -> PeerMonitorObserver { - PeerMonitorObserver { - our_address: self.our_address, - actual_sleds: self.sleds.clone(), - sender: self.notification_sender.clone(), - } - } -} - -/// Provides a read-only view of monitored peers, with a mechanism for -/// observing the incoming queue of new peers. -pub struct PeerMonitorObserver { - our_address: Ipv6Addr, - // A shared reference to the "true" set of sleds. - // - // This is only used to re-synchronize our set of sleds - // if we get out-of-sync due to long notification queues. - actual_sleds: Arc>>, - sender: broadcast::Sender, -} - -impl PeerMonitorObserver { - /// Returns the address of this sled. - pub fn our_address(&self) -> Ipv6Addr { - self.our_address - } - - /// Returns the current set of sleds and a receiver to hear about - /// new ones. - pub async fn subscribe( - &mut self, - ) -> (HashSet, broadcast::Receiver) { - let sleds = self.actual_sleds.lock().await; - let receiver = self.sender.subscribe(); - (sleds.clone(), receiver) - } -} diff --git a/sled-agent/src/bootstrap/maghemite.rs b/sled-agent/src/bootstrap/maghemite.rs index 45154a6ca0e..94d9d0628c7 100644 --- a/sled-agent/src/bootstrap/maghemite.rs +++ b/sled-agent/src/bootstrap/maghemite.rs @@ -37,6 +37,9 @@ fn enable_mg_ddm_service_blocking( log: Logger, interface: AddrObject, ) -> Result<(), Error> { + // TODO-correctness Should we try to shut down / remove any existing mg-ddm + // service first? This appears to work fine as-is on a restart of the + // sled-agent service. info!(log, "Importing mg-ddm service"; "path" => MANIFEST_PATH); smf::Config::import().run(MANIFEST_PATH)?; diff --git a/sled-agent/src/bootstrap/mod.rs b/sled-agent/src/bootstrap/mod.rs index b20cc06e9b2..3d8cbb2562d 100644 --- a/sled-agent/src/bootstrap/mod.rs +++ b/sled-agent/src/bootstrap/mod.rs @@ -7,9 +7,8 @@ pub mod agent; pub mod client; pub mod config; -pub mod discovery; +pub mod ddm_admin_client; mod maghemite; -pub mod multicast; pub(crate) mod params; pub(crate) mod rss_handle; pub mod server; diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs deleted file mode 100644 index 5bf69d3cc10..00000000000 --- a/sled-agent/src/bootstrap/multicast.rs +++ /dev/null @@ -1,130 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Ipv6 Multicast utilities used for Sled discovery. - -use std::io; -use std::net::{Ipv6Addr, SocketAddrV6}; -use tokio::net::UdpSocket; - -/// Scope of an IPv6 Multicast address. -/// -/// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. -pub enum Ipv6MulticastScope { - #[allow(dead_code)] - InterfaceLocal, - LinkLocal, - #[allow(dead_code)] - RealmLocal, - #[allow(dead_code)] - AdminLocal, - #[allow(dead_code)] - SiteLocal, - #[allow(dead_code)] - OrganizationLocal, - #[allow(dead_code)] - GlobalScope, -} - -impl Ipv6MulticastScope { - /// Returns the first hextet of an Ipv6 multicast IP address. - pub fn first_hextet(&self) -> u16 { - // Reference: https://datatracker.ietf.org/doc/html/rfc4291#section-2.7 - // - // This implementation currently sets all multicast flags to zero; - // this could easily change if needed. - let flags = 0; - let flags_shifted = flags << 4; - match self { - Ipv6MulticastScope::InterfaceLocal => 0xFF01 | flags_shifted, - Ipv6MulticastScope::LinkLocal => 0xFF02 | flags_shifted, - Ipv6MulticastScope::RealmLocal => 0xFF03 | flags_shifted, - Ipv6MulticastScope::AdminLocal => 0xFF04 | flags_shifted, - Ipv6MulticastScope::SiteLocal => 0xFF05 | flags_shifted, - Ipv6MulticastScope::OrganizationLocal => 0xFF08 | flags_shifted, - Ipv6MulticastScope::GlobalScope => 0xFF0E | flags_shifted, - } - } -} - -fn new_ipv6_udp_socket() -> io::Result { - let socket = socket2::Socket::new( - socket2::Domain::IPV6, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - socket.set_only_v6(true)?; - // From - // https://docs.rs/tokio/1.14.0/tokio/net/struct.UdpSocket.html#method.from_std - // - // "It is left up to the user to set it in non-blocking mode". - // - // We (the user) do that here. - socket.set_nonblocking(true)?; - Ok(socket) -} - -/// Create a new listening socket, capable of receiving IPv6 multicast traffic. -fn new_ipv6_udp_listener( - addr: &SocketAddrV6, - interface: u32, -) -> io::Result { - let socket = new_ipv6_udp_socket()?; - - // From http://www.kohala.com/start/mcast.api.txt - // - // "More than one process may bind to the same SOCK_DGRAM UDP port [if - // SO_REUSEADDR is used]. In this case, every incoming multicast or - // broadcast UDP datagram destined to the shared port is delivered to all - // sockets bound to the port." - socket.set_reuse_address(true)?; - - socket.join_multicast_v6(addr.ip(), interface)?; - let bind_address = SocketAddrV6::new(*addr.ip(), addr.port(), 0, 0); - socket.bind(&(bind_address).into())?; - - // Convert from: socket2 -> std -> tokio - UdpSocket::from_std(std::net::UdpSocket::from(socket)) -} - -/// Create a new sending socket, capable of sending IPv6 multicast traffic. -fn new_ipv6_udp_sender( - addr: &Ipv6Addr, - loopback: bool, - interface: u32, -) -> io::Result { - let socket = new_ipv6_udp_socket()?; - socket.set_multicast_loop_v6(loopback)?; - socket.set_multicast_if_v6(interface)?; - let address = SocketAddrV6::new(*addr, 0, 0, 0); - socket.bind(&address.into())?; - - UdpSocket::from_std(std::net::UdpSocket::from(socket)) -} - -pub fn multicast_address() -> SocketAddrV6 { - let scope = Ipv6MulticastScope::LinkLocal.first_hextet(); - SocketAddrV6::new(Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x1), 7645, 0, 0) -} - -/// Returns the (sender, receiver) sockets of an IPv6 UDP multicast group. -/// -/// * `address`: The address to use for sending. -/// * `loopback`: If true, the receiver packet will see multicast packets sent -/// on our sender, in addition to those sent by everyone else in the multicast -/// group. -/// * `interface`: The index of the interface to join (zero indicates "any -/// interface"). -pub fn new_ipv6_udp_pair( - address: &Ipv6Addr, - loopback: bool, - interface: u32, -) -> io::Result<(UdpSocket, UdpSocket)> { - let sender = new_ipv6_udp_sender(&address, loopback, interface)?; - let listener = new_ipv6_udp_listener(&multicast_address(), interface)?; - - Ok((sender, listener)) -} - -// Refer to sled-agent/tests/integration_tests/multicast.rs for tests. diff --git a/sled-agent/src/bootstrap/rss_handle.rs b/sled-agent/src/bootstrap/rss_handle.rs index 9bc7529e88f..c4aac6d5425 100644 --- a/sled-agent/src/bootstrap/rss_handle.rs +++ b/sled-agent/src/bootstrap/rss_handle.rs @@ -5,7 +5,6 @@ //! sled-agent's handle to the Rack Setup Service it spawns use super::client as bootstrap_agent_client; -use super::discovery::PeerMonitorObserver; use super::params::SledAgentRequest; use crate::rack_setup::config::SetupServiceConfig; use crate::rack_setup::service::Service; @@ -17,6 +16,7 @@ use omicron_common::backoff::retry_notify; use omicron_common::backoff::BackoffError; use slog::Logger; use sprockets_host::Ed25519Certificate; +use std::net::Ipv6Addr; use std::net::SocketAddrV6; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -43,16 +43,15 @@ impl RssHandle { pub(super) fn start_rss( log: &Logger, config: SetupServiceConfig, - peer_monitor: PeerMonitorObserver, + our_bootstrap_address: Ipv6Addr, sp: Option, member_device_id_certs: Vec, ) -> Self { - let (tx, rx) = rss_channel(); + let (tx, rx) = rss_channel(our_bootstrap_address); let rss = Service::new( log.new(o!("component" => "RSS")), config, - peer_monitor, tx, member_device_id_certs, ); @@ -111,10 +110,12 @@ async fn initialize_sled_agent( // communication in the types below to avoid using tokio channels directly and // leave a breadcrumb for where the work will need to be done to switch the // communication mechanism. -fn rss_channel() -> (BootstrapAgentHandle, BootstrapAgentHandleReceiver) { +fn rss_channel( + our_bootstrap_address: Ipv6Addr, +) -> (BootstrapAgentHandle, BootstrapAgentHandleReceiver) { let (tx, rx) = mpsc::channel(32); ( - BootstrapAgentHandle { inner: tx }, + BootstrapAgentHandle { inner: tx, our_bootstrap_address }, BootstrapAgentHandleReceiver { inner: rx }, ) } @@ -126,6 +127,7 @@ type InnerInitRequest = ( pub(crate) struct BootstrapAgentHandle { inner: mpsc::Sender, + our_bootstrap_address: Ipv6Addr, } impl BootstrapAgentHandle { @@ -152,6 +154,10 @@ impl BootstrapAgentHandle { self.inner.send((requests, tx)).await.unwrap(); rx.await.unwrap() } + + pub(crate) fn our_address(&self) -> Ipv6Addr { + self.our_bootstrap_address + } } struct BootstrapAgentHandleReceiver { diff --git a/sled-agent/src/rack_setup/service.rs b/sled-agent/src/rack_setup/service.rs index c48a20cc4bc..fca4f5a9316 100644 --- a/sled-agent/src/rack_setup/service.rs +++ b/sled-agent/src/rack_setup/service.rs @@ -5,14 +5,13 @@ //! Rack Setup Service implementation use super::config::{HardcodedSledRequest, SetupServiceConfig as Config}; -use crate::bootstrap::{ - config::BOOTSTRAP_AGENT_PORT, - discovery::PeerMonitorObserver, - params::SledAgentRequest, - rss_handle::BootstrapAgentHandle, - trust_quorum::{RackSecret, ShareDistribution}, -}; -use crate::params::{ServiceRequest, ServiceType}; +use crate::bootstrap::config::BOOTSTRAP_AGENT_PORT; +use crate::bootstrap::ddm_admin_client::{DdmAdminClient, DdmError}; +use crate::bootstrap::params::SledAgentRequest; +use crate::bootstrap::rss_handle::BootstrapAgentHandle; +use crate::bootstrap::trust_quorum::{RackSecret, ShareDistribution}; +use crate::params::ServiceRequest; +use crate::params::ServiceType; use omicron_common::address::{ get_sled_address, ReservedRackSubnet, DNS_PORT, DNS_SERVER_PORT, }; @@ -23,10 +22,10 @@ use serde::{Deserialize, Serialize}; use slog::Logger; use sprockets_host::Ed25519Certificate; use std::collections::{HashMap, HashSet}; +use std::iter; use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::path::PathBuf; use thiserror::Error; -use tokio::sync::Mutex; use uuid::Uuid; /// Describes errors which may occur while operating the setup service. @@ -45,6 +44,9 @@ pub enum SetupServiceError { #[error("Error making HTTP request to Sled Agent: {0}")] SledApi(#[from] sled_agent_client::Error), + #[error("Error contacting ddmd: {0}")] + DdmError(#[from] DdmError), + #[error("Cannot deserialize TOML file at {path}: {err}")] Toml { path: PathBuf, err: toml::de::Error }, @@ -84,7 +86,6 @@ impl Service { pub(crate) fn new( log: Logger, config: Config, - peer_monitor: PeerMonitorObserver, local_bootstrap_agent: BootstrapAgentHandle, // TODO-cleanup: We should be collecting the device ID certs of all // trust quorum members over the management network. Currently we don't @@ -93,7 +94,7 @@ impl Service { member_device_id_certs: Vec, ) -> Self { let handle = tokio::task::spawn(async move { - let svc = ServiceInner::new(log.clone(), peer_monitor); + let svc = ServiceInner::new(log.clone()); if let Err(e) = svc .inject_rack_setup_requests( &config, @@ -149,12 +150,11 @@ enum PeerExpectation { /// The implementation of the Rack Setup Service. struct ServiceInner { log: Logger, - peer_monitor: Mutex, } impl ServiceInner { - fn new(log: Logger, peer_monitor: PeerMonitorObserver) -> Self { - ServiceInner { log, peer_monitor: Mutex::new(peer_monitor) } + fn new(log: Logger) -> Self { + ServiceInner { log } } async fn initialize_datasets( @@ -425,42 +425,67 @@ impl ServiceInner { async fn wait_for_peers( &self, expectation: PeerExpectation, - ) -> Result, SetupServiceError> { - let mut peer_monitor = self.peer_monitor.lock().await; - let (mut all_addrs, mut peer_rx) = peer_monitor.subscribe().await; - all_addrs.insert(peer_monitor.our_address()); + our_bootstrap_address: Ipv6Addr, + ) -> Result, DdmError> { + let ddm_admin_client = DdmAdminClient::new(self.log.clone())?; + let addrs = retry_notify( + // TODO-correctness `internal_service_policy()` has potentially-long + // exponential backoff, which is probably not what we want. See + // https://github.com/oxidecomputer/omicron/issues/1270 + internal_service_policy(), + || async { + let peer_addrs = + ddm_admin_client.peer_addrs().await.map_err(|err| { + BackoffError::transient(format!( + "Failed getting peers from mg-ddm: {err}" + )) + })?; + + let all_addrs = peer_addrs + .chain(iter::once(our_bootstrap_address)) + .collect::>(); - loop { - { match expectation { PeerExpectation::LoadOldPlan(ref expected) => { if all_addrs.is_superset(expected) { - return Ok(all_addrs - .into_iter() - .collect::>()); + Ok(all_addrs.into_iter().collect()) + } else { + Err(BackoffError::transient( + concat!( + "Waiting for a LoadOldPlan set ", + "of peers not found yet." + ) + .to_string(), + )) } - info!(self.log, "Waiting for a LoadOldPlan set of peers; not found yet."); } PeerExpectation::CreateNewPlan(wanted_peer_count) => { if all_addrs.len() >= wanted_peer_count { - return Ok(all_addrs - .into_iter() - .collect::>()); + Ok(all_addrs.into_iter().collect()) + } else { + Err(BackoffError::transient(format!( + "Waiting for {} peers (currently have {})", + wanted_peer_count, + all_addrs.len() + ))) } - info!( - self.log, - "Waiting for {} peers (currently have {})", - wanted_peer_count, - all_addrs.len(), - ); } } - } + }, + |message, duration| { + info!( + self.log, + "{} (will retry after {:?})", message, duration + ); + }, + ) + // `internal_service_policy()` retries indefinitely on transient errors + // (the only kind we produce), allowing us to `.unwrap()` without + // panicking + .await + .unwrap(); - info!(self.log, "Waiting for more peers"); - let new_peer = peer_rx.recv().await?; - all_addrs.insert(new_peer); - } + Ok(addrs) } // In lieu of having an operator send requests to all sleds via an @@ -517,7 +542,9 @@ impl ServiceInner { } else { PeerExpectation::CreateNewPlan(config.requests.len()) }; - let addrs = self.wait_for_peers(expectation).await?; + let addrs = self + .wait_for_peers(expectation, local_bootstrap_agent.our_address()) + .await?; info!(self.log, "Enough peers exist to enact RSS plan"); // If we created a plan, reuse it. Otherwise, create a new plan. diff --git a/sled-agent/tests/integration_tests/mod.rs b/sled-agent/tests/integration_tests/mod.rs index 6c6686f6543..1bf43dc00c8 100644 --- a/sled-agent/tests/integration_tests/mod.rs +++ b/sled-agent/tests/integration_tests/mod.rs @@ -3,5 +3,3 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. mod commands; -#[cfg(target_os = "illumos")] -mod multicast; diff --git a/sled-agent/tests/integration_tests/multicast.rs b/sled-agent/tests/integration_tests/multicast.rs deleted file mode 100644 index a66eb8d0be6..00000000000 --- a/sled-agent/tests/integration_tests/multicast.rs +++ /dev/null @@ -1,106 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -use omicron_sled_agent::bootstrap; -use omicron_sled_agent::illumos::addrobj::AddrObject; -use omicron_sled_agent::illumos::{dladm, zone}; -use std::io; -use std::net::IpAddr; - -struct AddressCleanup { - addrobj: AddrObject, -} - -impl Drop for AddressCleanup { - fn drop(&mut self) { - let _ = zone::Zones::delete_address(None, &self.addrobj); - } -} - -// TODO(https://github.com/oxidecomputer/omicron/issues/1032): -// -// This test has value when hacking on multicast bootstrap address -// swapping, but is known to be flaky. It is being set to ignored -// for the following reasons: -// -// - It still can provide value when modifying the bootstrap address -// swap locally. -// - It is known to be flaky. -// - According to -// , -// we are planning on performing address swapping via Maghemite, so -// the implementation being tested here will eventually change enough -// to render the test obsolete. -#[tokio::test] -#[ignore] -async fn test_multicast_bootstrap_address() { - // Setup the bootstrap address. - // - // This modifies global state of the target machine, creating - // an address named "testbootstrap6", akin to what the bootstrap - // agent should do. - let etherstub = dladm::Dladm::create_etherstub().unwrap(); - let link = dladm::Dladm::create_etherstub_vnic(ðerstub).unwrap(); - - let phys_link = dladm::Dladm::find_physical().unwrap(); - let address = - bootstrap::agent::bootstrap_address(phys_link.clone()).unwrap(); - let address_name = "testbootstrap6"; - let addrobj = AddrObject::new(&link.0, address_name).unwrap(); - zone::Zones::ensure_has_global_zone_v6_address( - link, - *address.ip(), - address_name, - ) - .unwrap(); - - // Cleanup-on-drop removal of the bootstrap address. - let _cleanup = AddressCleanup { addrobj }; - - // Create the multicast pair. - let loopback = true; - let interface = 0; - let (sender, listener) = bootstrap::multicast::new_ipv6_udp_pair( - address.ip(), - loopback, - interface, - ) - .unwrap(); - - // Create a receiver task which reads for messages that have - // been broadcast, verifies the message, and returns the - // calling address. - let message = b"Hello World!"; - let receiver_task_handle = tokio::task::spawn(async move { - let mut buf = vec![0u8; 32]; - let (len, addr) = listener.recv_from(&mut buf).await?; - assert_eq!(message.len(), len); - assert_eq!(message, &buf[..message.len()]); - assert_eq!(addr.ip(), IpAddr::V6(*address.ip())); - Ok::<_, io::Error>(addr) - }); - - // Send a message repeatedly, and exit successfully if we - // manage to receive the response. - tokio::pin!(receiver_task_handle); - let mut send_count = 0; - loop { - tokio::select! { - result = sender.send_to(message, bootstrap::multicast::multicast_address()) => { - assert_eq!(message.len(), result.unwrap()); - send_count += 1; - if send_count > 10 { - panic!("10 multicast UDP messages sent with no response"); - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - result = &mut receiver_task_handle => { - let addr = result.unwrap().unwrap(); - eprintln!("Receiver received message: {:#?}", addr); - assert_eq!(addr.ip(), IpAddr::V6(*address.ip())); - break; - } - } - } -}