diff --git a/Cargo.lock b/Cargo.lock index 1b526a5f43b..996a24b23a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1691,6 +1691,7 @@ dependencies = [ "serial_test", "slog", "smf", + "socket2", "structopt", "subprocess", "tar", diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 0173e4cf67f..63991066813 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" slog = { version = "2.5", features = [ "max_level_trace", "release_max_level_debug" ] } smf = "0.2" +socket2 = { version = "0.4", features = [ "all" ] } structopt = "0.3" tar = "0.4" tempfile = "3.2" diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index 902a27a5d64..75dea014406 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -6,19 +6,25 @@ use super::client::types as bootstrap_types; use super::client::Client as BootstrapClient; +use super::discovery; use super::views::ShareResponse; -use omicron_common::api::external::Error; +use omicron_common::api::external::Error as ExternalError; +use omicron_common::backoff::{ + internal_service_policy, retry_notify, BackoffError, +}; use omicron_common::packaging::sha256_digest; use slog::Logger; use std::collections::HashMap; use std::fs::File; use std::io::{Seek, SeekFrom}; -use std::net::SocketAddr; use std::path::Path; use tar::Archive; use thiserror::Error; +const UNLOCK_THRESHOLD: usize = 2; +const BOOTSTRAP_PORT: u16 = 12346; + /// Describes errors which may occur while operating the bootstrap service. #[derive(Error, Debug)] pub enum BootstrapError { @@ -39,24 +45,35 @@ pub enum BootstrapError { #[error("Error making HTTP request")] Api(#[from] anyhow::Error), + + #[error("Not enough peers to unlock storage")] + NotEnoughPeers, +} + +impl From for ExternalError { + fn from(err: BootstrapError) -> Self { + Self::internal_error(&err.to_string()) + } } /// The entity responsible for bootstrapping an Oxide rack. pub(crate) struct Agent { /// Debug log log: Logger, + peer_monitor: discovery::PeerMonitor, } impl Agent { - pub fn new(log: Logger) -> Self { - Agent { log } + pub fn new(log: Logger) -> Result { + let peer_monitor = discovery::PeerMonitor::new(&log)?; + Ok(Agent { log, peer_monitor }) } /// Implements the "request share" API. pub async fn request_share( &self, identity: Vec, - ) -> Result { + ) -> Result { // TODO-correctness: Validate identity, return whatever // information is necessary to establish trust quorum. // @@ -66,42 +83,92 @@ impl Agent { Ok(ShareResponse { shared_secret: vec![] }) } - /// Performs device initialization: + /// Communicates with peers, sharing secrets, until the rack has been + /// sufficiently unlocked. /// - /// - TODO: Communicates with other sled agents to establish a trust quorum. - /// - Verifies, unpacks, and launches other services. - pub async fn initialize( - &self, - other_agents: Vec, - ) -> Result<(), BootstrapError> { - info!(&self.log, "bootstrap service initializing"); - // TODO-correctness: - // - Establish trust quorum. - // - Once this is done, "unlock" local storage - // - // The current implementation sends a stub request to all known - // sled agents, but does not actually create a quorum / unlock - // anything. - let other_agents: Vec = other_agents - .into_iter() - .map(|addr| { - let addr_str = addr.to_string(); - BootstrapClient::new( - &format!("http://{}", addr_str,), - self.log.new(o!( - "Address" => addr_str, - )), + /// - This method retries until [`UNLOCK_THRESHOLD`] other agents are + /// online, and have successfully responded to "share requests". + async fn establish_sled_quorum(&self) -> Result<(), BootstrapError> { + retry_notify( + internal_service_policy(), + || async { + let other_agents = self.peer_monitor.addrs().await; + info!(&self.log, "Bootstrap: Communicating with peers: {:?}", other_agents); + + // "-1" to account for ourselves. + // + // NOTE: Clippy error exists while the compile-time unlock + // threshold is "1", because we basically don't require any + // peers to unlock. + #[allow(clippy::absurd_extreme_comparisons)] + if other_agents.len() < UNLOCK_THRESHOLD - 1 { + warn!(&self.log, "Not enough peers to start establishing quorum"); + return Err(BackoffError::Transient( + BootstrapError::NotEnoughPeers, + )); + } + info!(&self.log, "Bootstrap: Enough peers to start share transfer"); + + // TODO-correctness: + // - Establish trust quorum. + // - Once this is done, "unlock" local storage + // + // The current implementation sends a stub request to all known sled + // agents, but does not actually create a quorum / unlock anything. + let other_agents: Vec = other_agents + .into_iter() + .map(|mut addr| { + addr.set_port(BOOTSTRAP_PORT); + // TODO-correctness: + // + // Many rust crates - such as "URL" - really dislike + // using scopes in IPv6 addresses. Using + // "addr.to_string()" results in an IP address format + // that is rejected when embedded into a URL. + // + // Instead, we merely use IP and port for the moment, + // which loses the scope information. Longer-term, if we + // use ULAs (Unique Local Addresses) the scope shouldn't + // be a factor anyway. + let addr_str = format!("[{}]:{}", addr.ip(), addr.port()); + info!(&self.log, "bootstrap: Connecting to {}", addr_str); + BootstrapClient::new( + &format!("http://{}", addr_str), + self.log.new(o!( + "Address" => addr_str, + )), + ) + }) + .collect(); + for agent in &other_agents { + agent + .api_request_share(&bootstrap_types::ShareRequest { + identity: vec![], + }) + .await + .map_err(|e| { + info!(&self.log, "Bootstrap: Failed to share request with peer: {:?}", e); + BackoffError::Transient(BootstrapError::Api(e)) + })?; + info!(&self.log, "Bootstrap: Shared request with peer"); + } + Ok(()) + }, + |error, duration| { + warn!( + self.log, + "Failed to unlock sleds (will retry after {:?}: {:#}", + duration, + error, ) - }) - .collect(); - for agent in &other_agents { - agent - .api_request_share(&bootstrap_types::ShareRequest { - identity: vec![], - }) - .await?; - } + }, + ) + .await?; + + Ok(()) + } + async fn launch_local_services(&self) -> Result<(), BootstrapError> { let tar_source = Path::new("/opt/oxide"); let destination = Path::new("/opt/oxide"); // TODO-correctness: Validation should come from ROT, not local file. @@ -129,6 +196,19 @@ impl Agent { Ok(()) } + /// Performs device initialization: + /// + /// - TODO: Communicates with other sled agents to establish a trust quorum. + /// - Verifies, unpacks, and launches other services. + pub async fn initialize(&self) -> Result<(), BootstrapError> { + info!(&self.log, "bootstrap service initializing"); + + self.establish_sled_quorum().await?; + self.launch_local_services().await?; + + Ok(()) + } + fn launch( &self, digests: &HashMap>, diff --git a/sled-agent/src/bootstrap/discovery.rs b/sled-agent/src/bootstrap/discovery.rs new file mode 100644 index 00000000000..c1cb9117b89 --- /dev/null +++ b/sled-agent/src/bootstrap/discovery.rs @@ -0,0 +1,93 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Sled announcement and discovery. + +use super::multicast; +use slog::Logger; +use std::collections::HashSet; +use std::io; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::sync::Arc; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +/// Manages Sled Discovery - both our announcement to other Sleds, +/// as well as our discovery of those sleds. +pub struct PeerMonitor { + sleds: Arc>>, + _worker: JoinHandle<()>, +} + +async fn monitor_worker( + log: Logger, + address: SocketAddrV6, + sender: UdpSocket, + listener: UdpSocket, + sleds: Arc>>, +) { + // Let this message be a reminder that this content is *not* + // encrypted, authenticated, or otherwise verified. We're just using + // it as a starting point for swapping addresses. + let message = + b"We've been trying to reach you about your car's extended warranty"; + loop { + let mut buf = vec![0u8; 128]; + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => { + info!(log, "Bootstrap Peer Monitor: Broadcasting our own address: {}", address); + if let Err(e) = sender.try_send_to(message, address.into()) { + warn!(log, "PeerMonitor failed to broadcast: {}", e); + } + } + result = listener.recv_from(&mut buf) => { + match result { + Ok((_, addr)) => { + info!(log, "Bootstrap Peer Monitor: Successfully received an address: {}", addr); + sleds.lock().await.insert(addr); + }, + Err(e) => warn!(log, "PeerMonitor failed to receive: {}", e), + } + } + } + } +} + +impl PeerMonitor { + /// Creates a new [`PeerMonitor`]. + // TODO: Address, port, interface, etc, probably should be + // configuration options. + pub fn new(log: &Logger) -> Result { + let scope = multicast::Ipv6MulticastScope::LinkLocal.first_hextet(); + let address = SocketAddrV6::new( + Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x1), + 7645, + 0, + 0, + ); + let loopback = false; + let interface = 0; + let (sender, listener) = + multicast::new_ipv6_udp_pair(&address, loopback, interface)?; + + let sleds = Arc::new(Mutex::new(HashSet::new())); + let sleds_for_worker = sleds.clone(); + let log = log.clone(); + + let worker = tokio::task::spawn(async move { + monitor_worker(log, address, sender, listener, sleds_for_worker) + .await + }); + + Ok(PeerMonitor { sleds, _worker: worker }) + } + + /// Returns the addresses of connected sleds. + /// + /// Note: These sleds have not yet been verified. + pub async fn addrs(&self) -> Vec { + self.sleds.lock().await.iter().map(|addr| *addr).collect() + } +} diff --git a/sled-agent/src/bootstrap/http_entrypoints.rs b/sled-agent/src/bootstrap/http_entrypoints.rs index 0fa05b622b2..c8a6bde01ca 100644 --- a/sled-agent/src/bootstrap/http_entrypoints.rs +++ b/sled-agent/src/bootstrap/http_entrypoints.rs @@ -30,6 +30,7 @@ use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::RequestContext; use dropshot::TypedBody; +use omicron_common::api::external::Error as ExternalError; use std::sync::Arc; use super::agent::Agent; @@ -62,5 +63,10 @@ async fn api_request_share( let bootstrap_agent = rqctx.context(); let request = request.into_inner(); - Ok(HttpResponseOk(bootstrap_agent.request_share(request.identity).await?)) + Ok(HttpResponseOk( + bootstrap_agent + .request_share(request.identity) + .await + .map_err(|e| ExternalError::from(e))?, + )) } diff --git a/sled-agent/src/bootstrap/mod.rs b/sled-agent/src/bootstrap/mod.rs index 01df6670a48..9d3e47d21a6 100644 --- a/sled-agent/src/bootstrap/mod.rs +++ b/sled-agent/src/bootstrap/mod.rs @@ -7,7 +7,9 @@ pub mod agent; mod client; pub mod config; +mod discovery; mod http_entrypoints; +mod multicast; mod params; pub mod server; mod views; diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs new file mode 100644 index 00000000000..78a611ff978 --- /dev/null +++ b/sled-agent/src/bootstrap/multicast.rs @@ -0,0 +1,194 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Ipv6 Multicast utilities used for Sled discovery. + +use std::io; +use std::net::{Ipv6Addr, SocketAddrV6}; +use tokio::net::UdpSocket; + +/// Scope of an IPv6 Multicast address. +/// +/// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. +pub enum Ipv6MulticastScope { + #[allow(dead_code)] + InterfaceLocal, + LinkLocal, + #[allow(dead_code)] + RealmLocal, + #[allow(dead_code)] + AdminLocal, + #[allow(dead_code)] + SiteLocal, + #[allow(dead_code)] + OrganizationLocal, + #[allow(dead_code)] + GlobalScope, +} + +impl Ipv6MulticastScope { + /// Returns the first hextet of an Ipv6 multicast IP address. + pub fn first_hextet(&self) -> u16 { + // Reference: https://datatracker.ietf.org/doc/html/rfc4291#section-2.7 + // + // This implementation currently sets all multicast flags to zero; + // this could easily change if needed. + let flags = 0; + let flags_shifted = flags << 4; + match self { + Ipv6MulticastScope::InterfaceLocal => 0xFF01 | flags_shifted, + Ipv6MulticastScope::LinkLocal => 0xFF02 | flags_shifted, + Ipv6MulticastScope::RealmLocal => 0xFF03 | flags_shifted, + Ipv6MulticastScope::AdminLocal => 0xFF04 | flags_shifted, + Ipv6MulticastScope::SiteLocal => 0xFF05 | flags_shifted, + Ipv6MulticastScope::OrganizationLocal => 0xFF08 | flags_shifted, + Ipv6MulticastScope::GlobalScope => 0xFF0E | flags_shifted, + } + } +} + +fn new_ipv6_udp_socket() -> io::Result { + let socket = socket2::Socket::new( + socket2::Domain::IPV6, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + socket.set_only_v6(true)?; + // From + // https://docs.rs/tokio/1.14.0/tokio/net/struct.UdpSocket.html#method.from_std + // + // "It is left up to the user to set it in non-blocking mode". + // + // We (the user) do that here. + socket.set_nonblocking(true)?; + Ok(socket) +} + +/// Create a new listening socket, capable of receiving IPv6 multicast traffic. +fn new_ipv6_udp_listener( + addr: &SocketAddrV6, + interface: u32, +) -> io::Result { + let socket = new_ipv6_udp_socket()?; + + // From http://www.kohala.com/start/mcast.api.txt + // + // "More than one process may bind to the same SOCK_DGRAM UDP port [if + // SO_REUSEADDR is used]. In this case, every incoming multicast or + // broadcast UDP datagram destined to the shared port is delivered to all + // sockets bound to the port." + socket.set_reuse_address(true)?; + + socket.join_multicast_v6(addr.ip(), interface)?; + + // TODO: I tried binding on the input value of "addr.ip()", but doing so + // returns errno 22 ("Invalid Input"). + // + // This may be binding to a larger address range than we want. + let bind_address = + SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, addr.port(), 0, 0); + socket.bind(&(bind_address).into())?; + + // Convert from: socket2 -> std -> tokio + UdpSocket::from_std(std::net::UdpSocket::from(socket)) +} + +/// Create a new sending socket, capable of sending IPv6 multicast traffic. +fn new_ipv6_udp_sender( + loopback: bool, + interface: u32, +) -> io::Result { + let socket = new_ipv6_udp_socket()?; + socket.set_multicast_loop_v6(loopback)?; + socket.set_multicast_if_v6(interface)?; + let address = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0); + socket.bind(&address.into())?; + + UdpSocket::from_std(std::net::UdpSocket::from(socket)) +} + +/// Returns the (sender, receiver) sockets of an IPv6 UDP multicast group. +/// +/// * `address`: The address to use. Consider a value from: +/// , +/// and the [`Ipv6MulticastScope`] helper to provide the first hextet. +/// * `loopback`: If true, the receiver packet will see multicast packets sent +/// on our sender, in addition to those sent by everyone else in the multicast +/// group. +/// * `interface`: The index of the interface to join (zero indicates "any +/// interface"). +pub fn new_ipv6_udp_pair( + address: &SocketAddrV6, + loopback: bool, + interface: u32, +) -> io::Result<(UdpSocket, UdpSocket)> { + let sender = new_ipv6_udp_sender(loopback, interface)?; + let listener = new_ipv6_udp_listener(address, interface)?; + + Ok((sender, listener)) +} + +#[cfg(test)] +mod test { + use super::*; + + // NOTE: This test is ignored by default - it relies on a networking + // setup that isn't consistent between our automated test infrastructure. + // It can still be run locally with: + // + // $ cargo test -p omicron-sled-agent -- --ignored + #[tokio::test] + #[ignore] + async fn test_multicast_ipv6() { + let message = b"Hello World!"; + let scope = Ipv6MulticastScope::LinkLocal.first_hextet(); + let address = SocketAddrV6::new( + Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x1), + 7645, + 0, + 0, + ); + + // For this test, we want to see our own transmission. + // Unlike most usage in the Sled Agent, this means we want + // loopback to be enabled. + let loopback = true; + let interface = 0; + let (sender, listener) = + new_ipv6_udp_pair(&address, loopback, interface).unwrap(); + + // Create a receiver task which reads for messages that have + // been broadcast, verifies the message, and returns the + // calling address. + let receiver_task_handle = tokio::task::spawn(async move { + let mut buf = vec![0u8; 32]; + let (len, addr) = listener.recv_from(&mut buf).await?; + assert_eq!(message.len(), len); + assert_eq!(message, &buf[..message.len()]); + Ok::<_, io::Error>(addr) + }); + + // Send a message repeatedly, and exit successfully if we + // manage to receive the response. + tokio::pin!(receiver_task_handle); + let mut send_count = 0; + loop { + tokio::select! { + result = sender.send_to(message, address) => { + assert_eq!(message.len(), result.unwrap()); + send_count += 1; + if send_count > 10 { + panic!("10 multicast UDP messages sent with no response"); + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + result = &mut receiver_task_handle => { + let addr = result.unwrap().unwrap(); + eprintln!("Receiver received message: {:#?}", addr); + break; + } + } + } + } +} diff --git a/sled-agent/src/bootstrap/server.rs b/sled-agent/src/bootstrap/server.rs index 05ae7f7fafe..8ddb7a479f9 100644 --- a/sled-agent/src/bootstrap/server.rs +++ b/sled-agent/src/bootstrap/server.rs @@ -28,7 +28,8 @@ impl Server { "component" => "Agent", "server" => config.id.clone().to_string() )); - let bootstrap_agent = Arc::new(Agent::new(ba_log)); + let bootstrap_agent = + Arc::new(Agent::new(ba_log).map_err(|e| e.to_string())?); let ba = Arc::clone(&bootstrap_agent); let dropshot_log = log.new(o!("component" => "dropshot")); @@ -47,7 +48,7 @@ impl Server { // This ordering allows the bootstrap agent to communicate with // other bootstrap agents on the rack during the initialization // process. - if let Err(e) = server.bootstrap_agent.initialize(vec![]).await { + if let Err(e) = server.bootstrap_agent.initialize().await { let _ = server.close().await; return Err(e.to_string()); } diff --git a/smf/sled-agent/manifest.xml b/smf/sled-agent/manifest.xml index 3c6810f7366..b15661acf4c 100644 --- a/smf/sled-agent/manifest.xml +++ b/smf/sled-agent/manifest.xml @@ -41,7 +41,7 @@ - +