From 66a9566fa7e7c73ad9e95cdb9f2b94c58e869039 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 17 Nov 2021 13:09:52 -0500 Subject: [PATCH 01/11] WIP multicast --- Cargo.lock | 1 + sled-agent/Cargo.toml | 1 + sled-agent/src/bootstrap/agent.rs | 10 +- sled-agent/src/bootstrap/http_entrypoints.rs | 9 +- sled-agent/src/bootstrap/mod.rs | 1 + sled-agent/src/bootstrap/multicast.rs | 123 +++++++++++++++++++ 6 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 sled-agent/src/bootstrap/multicast.rs diff --git a/Cargo.lock b/Cargo.lock index bb9576c4b3e..0e726d34d72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,6 +1534,7 @@ dependencies = [ "serial_test", "slog", "smf", + "socket2", "structopt", "subprocess", "tar", diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 756e04a3246..b9bc953116d 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" slog = { version = "2.5", features = [ "max_level_trace", "release_max_level_debug" ] } smf = "0.2" +socket2 = { version = "0.4", features = [ "all" ] } structopt = "0.3" tar = "0.4" tempfile = "3.2" diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index 3d3ee745dd7..cb0e529fe97 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -2,7 +2,7 @@ use super::client::types as bootstrap_types; use super::client::Client as BootstrapClient; -use omicron_common::api::external::Error; +use omicron_common::api::external::Error as ExternalError; use omicron_common::api::internal::bootstrap_agent::ShareResponse; use omicron_common::packaging::sha256_digest; @@ -37,6 +37,12 @@ pub enum BootstrapError { Api(#[from] anyhow::Error), } +impl From for ExternalError { + fn from(err: BootstrapError) -> Self { + Self::internal_error(&err.to_string()) + } +} + /// The entity responsible for bootstrapping an Oxide rack. pub struct Agent { /// Debug log @@ -52,7 +58,7 @@ impl Agent { pub async fn request_share( &self, identity: Vec, - ) -> Result { + ) -> Result { // TODO-correctness: Validate identity, return whatever // information is necessary to establish trust quorum. // diff --git a/sled-agent/src/bootstrap/http_entrypoints.rs b/sled-agent/src/bootstrap/http_entrypoints.rs index 31f36eff9fe..673bec984ca 100644 --- a/sled-agent/src/bootstrap/http_entrypoints.rs +++ b/sled-agent/src/bootstrap/http_entrypoints.rs @@ -6,6 +6,7 @@ use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::RequestContext; use dropshot::TypedBody; +use omicron_common::api::external::Error as ExternalError; use omicron_common::api::internal::bootstrap_agent::{ ShareRequest, ShareResponse, }; @@ -40,5 +41,11 @@ async fn api_request_share( let bootstrap_agent = rqctx.context(); let request = request.into_inner(); - Ok(HttpResponseOk(bootstrap_agent.request_share(request.identity).await?)) + Ok( + HttpResponseOk( + bootstrap_agent.request_share(request.identity) + .await + .map_err(|e| ExternalError::from(e))? + ) + ) } diff --git a/sled-agent/src/bootstrap/mod.rs b/sled-agent/src/bootstrap/mod.rs index 217ce039d68..406a3f71917 100644 --- a/sled-agent/src/bootstrap/mod.rs +++ b/sled-agent/src/bootstrap/mod.rs @@ -4,4 +4,5 @@ pub mod agent; mod client; pub mod config; mod http_entrypoints; +mod multicast; pub mod server; diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs new file mode 100644 index 00000000000..0f4a1278257 --- /dev/null +++ b/sled-agent/src/bootstrap/multicast.rs @@ -0,0 +1,123 @@ +use std::io; +use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}; +use tokio::net::UdpSocket; + +// TODO: 0x +// pub static ref IPV6: IpAddr = Ipv6Addr::new(0xFF04, 0, 0, 0, 0, 0, 0, 0x0123).into(); + +/// Scope of an IPv6 Multicast address. +/// +/// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. +pub enum Ipv6MulticastScope { + InterfaceLocal, + LinkLocal, + RealmLocal, + AdminLocal, + SiteLocal, + OrganizationLocal, + GlobalScope, +} + +impl Ipv6MulticastScope { + /// Returns the first hextet of an Ipv6 multicast IP address. + pub fn first_hextet(&self) -> u16 { + // Reference: https://datatracker.ietf.org/doc/html/rfc4291#section-2.7 + // + // This implementation currently sets all multicast flags to zero; + // this could easily change if needed. + let flags = 0; + let flags_shifted = flags << 4; + match self { + Ipv6MulticastScope::InterfaceLocal => 0xFF01 | flags_shifted, + Ipv6MulticastScope::LinkLocal => 0xFF02 | flags_shifted, + Ipv6MulticastScope::RealmLocal => 0xFF03 | flags_shifted, + Ipv6MulticastScope::AdminLocal => 0xFF04 | flags_shifted, + Ipv6MulticastScope::SiteLocal => 0xFF05 | flags_shifted, + Ipv6MulticastScope::OrganizationLocal => 0xFF08 | flags_shifted, + Ipv6MulticastScope::GlobalScope => 0xFF0E | flags_shifted, + } + } +} + +fn new_ipv6_udp_socket(addr: &SocketAddrV6) -> io::Result { + eprintln!("Creating new socket"); + let socket = socket2::Socket::new( + socket2::Domain::IPV6, + // From + // https://docs.rs/tokio/1.14.0/tokio/net/struct.UdpSocket.html#method.from_std + // + // "It is left up to the user to set it in non-blocking mode". + // + // We (the user) do that here. + socket2::Type::DGRAM.nonblocking(), + Some(socket2::Protocol::UDP), + )?; + eprintln!("Creating new socket - setting v6 only"); + socket.set_only_v6(true)?; + eprintln!("Creating new socket - OK"); + Ok(socket) +} + +/// Create a new listening socket, capable of receiving IPv6 multicast traffic. +pub fn new_ipv6_multicast_udp_listener(addr: &SocketAddrV6) -> io::Result { + eprintln!("Creating listener"); + let socket = new_ipv6_udp_socket(&addr)?; + + // From http://www.kohala.com/start/mcast.api.txt + // + // "More than one process may bind to the same SOCK_DGRAM UDP port [if + // SO_REUSEADDR is used]. In this case, every incoming multicast or + // broadcast UDP datagram destined to the shared port is delivered to all + // sockets bound to the port." + eprintln!("Creating listener - setting re-use"); + socket.set_reuse_address(true)?; + // TODO: We can specify a more specific interface here. Should we? + eprintln!("Creating listener - joining multi-cast"); + socket.join_multicast_v6(addr.ip(), 0)?; + eprintln!("Creating listener - binding"); + socket.bind(&(*addr).into())?; + eprintln!("Creating listener - OK"); + + // Convert from: socket2 -> std -> tokio + UdpSocket::from_std(std::net::UdpSocket::from(socket)) +} + +/// Create a new sending socket, capable of sending IPv6 multicast traffic. +pub fn new_ipv6_multicast_udp_sender(addr: &SocketAddrV6) -> io::Result { + eprintln!("Creating sender"); + let socket = new_ipv6_udp_socket(&addr)?; + // Avoid seeing our own transmissions. + eprintln!("Creating sender - setting multicast loop to false"); + socket.set_multicast_loop_v6(false)?; + + // TODO: Should we pick a specific interface? + // socket.set_multicast_if_v6(...)?; + let any_interface_address = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), 0, 0, 0); + eprintln!("Creating sender - binding"); + socket.bind(&any_interface_address.into())?; + eprintln!("Creating sender - OK"); + + // Convert from: socket2 -> std -> tokio + UdpSocket::from_std(std::net::UdpSocket::from(socket)) +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_multicast_v6() { + let message = b"Hello World!"; + let scope = Ipv6MulticastScope::LinkLocal.first_hextet(); + let address = SocketAddrV6::new( + Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x0123), + 7645, 0, 0 + ); + + eprintln!("Address: {}", address); + let listener = new_ipv6_multicast_udp_listener(&address).unwrap(); + let sender = new_ipv6_multicast_udp_sender(&address).unwrap(); + + + } +} From efda1da41d4476398074d810018d5b890c986c28 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 18 Nov 2021 09:54:55 -0500 Subject: [PATCH 02/11] it's a mess, but it works on my machine --- sled-agent/src/bootstrap/multicast.rs | 62 +++++++++++++++++++++------ 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index 0f4a1278257..ed1819168ed 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -2,9 +2,6 @@ use std::io; use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}; use tokio::net::UdpSocket; -// TODO: 0x -// pub static ref IPV6: IpAddr = Ipv6Addr::new(0xFF04, 0, 0, 0, 0, 0, 0, 0x0123).into(); - /// Scope of an IPv6 Multicast address. /// /// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. @@ -71,11 +68,19 @@ pub fn new_ipv6_multicast_udp_listener(addr: &SocketAddrV6) -> io::Result std -> tokio @@ -87,12 +92,13 @@ pub fn new_ipv6_multicast_udp_sender(addr: &SocketAddrV6) -> io::Result(buf) + }); + + tokio::pin!(echo_server_handle); + + let mut send_count = 0; + loop { + tokio::select! { + result = sender.send_to(message, address) => { + assert_eq!(message.len(), result.unwrap()); + send_count += 1; + if send_count > 50 { + panic!("we sent 50 messages with no response"); + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + _ = &mut echo_server_handle => { + eprintln!("Receiver received message"); + break; + } + } + } + } From cba49e3f3f41d778afddcf3aed6ac3c3e0c24f51 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 19 Nov 2021 12:12:30 -0500 Subject: [PATCH 03/11] Integrate multicast into bootstrap agent --- sled-agent/src/bootstrap/agent.rs | 122 ++++++++++++----- sled-agent/src/bootstrap/discovery.rs | 89 ++++++++++++ sled-agent/src/bootstrap/http_entrypoints.rs | 13 +- sled-agent/src/bootstrap/mod.rs | 1 + sled-agent/src/bootstrap/multicast.rs | 137 ++++++++++++------- sled-agent/src/bootstrap/server.rs | 5 +- 6 files changed, 269 insertions(+), 98 deletions(-) create mode 100644 sled-agent/src/bootstrap/discovery.rs diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index cb0e529fe97..6fce218c8ef 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -2,19 +2,24 @@ use super::client::types as bootstrap_types; use super::client::Client as BootstrapClient; +use super::discovery; use omicron_common::api::external::Error as ExternalError; use omicron_common::api::internal::bootstrap_agent::ShareResponse; +use omicron_common::backoff::{ + internal_service_policy, retry_notify, BackoffError, +}; use omicron_common::packaging::sha256_digest; use slog::Logger; use std::collections::HashMap; use std::fs::File; use std::io::{Seek, SeekFrom}; -use std::net::SocketAddr; use std::path::Path; use tar::Archive; use thiserror::Error; +const UNLOCK_THRESHOLD: usize = 1; + /// Describes errors which may occur while operating the bootstrap service. #[derive(Error, Debug)] pub enum BootstrapError { @@ -35,6 +40,9 @@ pub enum BootstrapError { #[error("Error making HTTP request")] Api(#[from] anyhow::Error), + + #[error("Not enough peers to unlock storage")] + NotEnoughPeers, } impl From for ExternalError { @@ -47,11 +55,13 @@ impl From for ExternalError { pub struct Agent { /// Debug log log: Logger, + peer_monitor: discovery::PeerMonitor, } impl Agent { - pub fn new(log: Logger) -> Self { - Agent { log } + pub fn new(log: Logger) -> Result { + let peer_monitor = discovery::PeerMonitor::new(&log)?; + Ok(Agent { log, peer_monitor }) } /// Implements the "request share" API. @@ -68,42 +78,69 @@ impl Agent { Ok(ShareResponse { shared_secret: vec![] }) } - /// Performs device initialization: + /// Communicates with peers, sharing secrets, until the rack has been + /// sufficiently unlocked. /// - /// - TODO: Communicates with other sled agents to establish a trust quorum. - /// - Verifies, unpacks, and launches other services. - pub async fn initialize( - &self, - other_agents: Vec, - ) -> Result<(), BootstrapError> { - info!(&self.log, "bootstrap service initializing"); - // TODO-correctness: - // - Establish trust quorum. - // - Once this is done, "unlock" local storage - // - // The current implementation sends a stub request to all known - // sled agents, but does not actually create a quorum / unlock - // anything. - let other_agents: Vec = other_agents - .into_iter() - .map(|addr| { - let addr_str = addr.to_string(); - BootstrapClient::new( - &format!("http://{}", addr_str,), - self.log.new(o!( - "Address" => addr_str, - )), + /// - This method retries until [`UNLOCK_THRESHOLD`] other agents are + /// online, and have successfully responded to "share requests". + async fn establish_sled_quorum(&self) -> Result<(), BootstrapError> { + retry_notify( + internal_service_policy(), + || async { + let other_agents = self.peer_monitor.addrs().await; + + // "-1" to account for ourselves. + if other_agents.len() < UNLOCK_THRESHOLD - 1 { + return Err(BackoffError::Transient( + BootstrapError::NotEnoughPeers, + )); + } + + // TODO-correctness: + // - Establish trust quorum. + // - Once this is done, "unlock" local storage + // + // The current implementation sends a stub request to all known sled + // agents, but does not actually create a quorum / unlock anything. + let other_agents: Vec = other_agents + .into_iter() + .map(|addr| { + let addr_str = addr.to_string(); + BootstrapClient::new( + &format!("http://{}", addr_str,), + self.log.new(o!( + "Address" => addr_str, + )), + ) + }) + .collect(); + for agent in &other_agents { + agent + .api_request_share(&bootstrap_types::ShareRequest { + identity: vec![], + }) + .await + .map_err(|e| { + BackoffError::Transient(BootstrapError::Api(e)) + })?; + } + Ok(()) + }, + |error, duration| { + warn!( + self.log, + "Failed to unlock sleds (will retry after {:?}: {:#}", + duration, + error, ) - }) - .collect(); - for agent in &other_agents { - agent - .api_request_share(&bootstrap_types::ShareRequest { - identity: vec![], - }) - .await?; - } + }, + ) + .await?; + Ok(()) + } + + async fn launch_local_services(&self) -> Result<(), BootstrapError> { let tar_source = Path::new("/opt/oxide"); let destination = Path::new("/opt/oxide"); // TODO-correctness: Validation should come from ROT, not local file. @@ -131,6 +168,19 @@ impl Agent { Ok(()) } + /// Performs device initialization: + /// + /// - TODO: Communicates with other sled agents to establish a trust quorum. + /// - Verifies, unpacks, and launches other services. + pub async fn initialize(&self) -> Result<(), BootstrapError> { + info!(&self.log, "bootstrap service initializing"); + + self.establish_sled_quorum().await?; + self.launch_local_services().await?; + + Ok(()) + } + fn launch( &self, digests: &HashMap>, diff --git a/sled-agent/src/bootstrap/discovery.rs b/sled-agent/src/bootstrap/discovery.rs new file mode 100644 index 00000000000..c348fc48434 --- /dev/null +++ b/sled-agent/src/bootstrap/discovery.rs @@ -0,0 +1,89 @@ +//! Sled announcement and discovery. + +use super::multicast; +use slog::Logger; +use std::collections::HashSet; +use std::io; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::sync::Arc; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +/// Manages Sled Discovery - both our announcement to other Sleds, +/// as well as our discovery of those sleds. +pub struct PeerMonitor { + sleds: Arc>>, + _worker: JoinHandle<()>, +} + +async fn monitor_worker( + log: Logger, + address: SocketAddrV6, + sender: UdpSocket, + listener: UdpSocket, + sleds: Arc>>, +) { + // Let this message be a reminder that this content is *not* + // encrypted, authenticated, or otherwise verified. We're just using + // it as a starting point for swapping addresses. + let message = + b"We've been trying to reach you about your car's extended warranty"; + loop { + let mut buf = vec![0u8; 128]; + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => { + info!(log, "Bootstrap Peer Monitor: Broadcasting our own address: {}", address); + if let Err(e) = sender.try_send_to(message, address.into()) { + warn!(log, "PeerMonitor failed to broadcast: {}", e); + } + } + result = listener.recv_from(&mut buf) => { + match result { + Ok((_, addr)) => { + info!(log, "Bootstrap Peer Monitor: Successfully received an address: {}", addr); + sleds.lock().await.insert(addr); + }, + Err(e) => warn!(log, "PeerMonitor failed to receive: {}", e), + } + } + } + } +} + +impl PeerMonitor { + /// Creates a new [`PeerMonitor`]. + // TODO: Address, port, interface, etc, probably should be + // configuration options. + pub fn new(log: &Logger) -> Result { + let scope = multicast::Ipv6MulticastScope::LinkLocal.first_hextet(); + let address = SocketAddrV6::new( + Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x1), + 7645, + 0, + 0, + ); + let loopback = false; + let interface = 0; + let (sender, listener) = + multicast::new_ipv6_udp_pair(&address, loopback, interface)?; + + let sleds = Arc::new(Mutex::new(HashSet::new())); + let sleds_for_worker = sleds.clone(); + let log = log.clone(); + + let worker = tokio::task::spawn(async move { + monitor_worker(log, address, sender, listener, sleds_for_worker) + .await + }); + + Ok(PeerMonitor { sleds, _worker: worker }) + } + + /// Returns the addresses of connected sleds. + /// + /// Note: These sleds have not yet been verified. + pub async fn addrs(&self) -> Vec { + self.sleds.lock().await.iter().map(|addr| addr.clone()).collect() + } +} diff --git a/sled-agent/src/bootstrap/http_entrypoints.rs b/sled-agent/src/bootstrap/http_entrypoints.rs index 673bec984ca..50e89567d93 100644 --- a/sled-agent/src/bootstrap/http_entrypoints.rs +++ b/sled-agent/src/bootstrap/http_entrypoints.rs @@ -41,11 +41,10 @@ async fn api_request_share( let bootstrap_agent = rqctx.context(); let request = request.into_inner(); - Ok( - HttpResponseOk( - bootstrap_agent.request_share(request.identity) - .await - .map_err(|e| ExternalError::from(e))? - ) - ) + Ok(HttpResponseOk( + bootstrap_agent + .request_share(request.identity) + .await + .map_err(|e| ExternalError::from(e))?, + )) } diff --git a/sled-agent/src/bootstrap/mod.rs b/sled-agent/src/bootstrap/mod.rs index 406a3f71917..c099483da6a 100644 --- a/sled-agent/src/bootstrap/mod.rs +++ b/sled-agent/src/bootstrap/mod.rs @@ -3,6 +3,7 @@ pub mod agent; mod client; pub mod config; +mod discovery; mod http_entrypoints; mod multicast; pub mod server; diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index ed1819168ed..7b16b83bbc8 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -1,17 +1,34 @@ +//! Ipv6 Multicast utilities used for Sled discovery. + use std::io; -use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::net::{Ipv6Addr, SocketAddrV6}; use tokio::net::UdpSocket; +// NOTE: Use zones? Exclusive netstacks? +// ipdadm can drop packets? +// pstop can stop packets +// NOTE: start w/link local, get address (TCP!) from 'hello' message. +// NOTE: Want to to set up pairs for each link. We'll have +// one for each switch. +// NOTE: "LIF" ioctls to identify devices? +// NOTE: Maghemite uses link-level multicast to discovery neighbors. + /// Scope of an IPv6 Multicast address. /// /// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. pub enum Ipv6MulticastScope { + #[allow(dead_code)] InterfaceLocal, LinkLocal, + #[allow(dead_code)] RealmLocal, + #[allow(dead_code)] AdminLocal, + #[allow(dead_code)] SiteLocal, + #[allow(dead_code)] OrganizationLocal, + #[allow(dead_code)] GlobalScope, } @@ -36,8 +53,7 @@ impl Ipv6MulticastScope { } } -fn new_ipv6_udp_socket(addr: &SocketAddrV6) -> io::Result { - eprintln!("Creating new socket"); +fn new_ipv6_udp_socket() -> io::Result { let socket = socket2::Socket::new( socket2::Domain::IPV6, // From @@ -49,16 +65,16 @@ fn new_ipv6_udp_socket(addr: &SocketAddrV6) -> io::Result { socket2::Type::DGRAM.nonblocking(), Some(socket2::Protocol::UDP), )?; - eprintln!("Creating new socket - setting v6 only"); socket.set_only_v6(true)?; - eprintln!("Creating new socket - OK"); Ok(socket) } /// Create a new listening socket, capable of receiving IPv6 multicast traffic. -pub fn new_ipv6_multicast_udp_listener(addr: &SocketAddrV6) -> io::Result { - eprintln!("Creating listener"); - let socket = new_ipv6_udp_socket(&addr)?; +fn new_ipv6_udp_listener( + addr: &SocketAddrV6, + interface: u32, +) -> io::Result { + let socket = new_ipv6_udp_socket()?; // From http://www.kohala.com/start/mcast.api.txt // @@ -66,96 +82,111 @@ pub fn new_ipv6_multicast_udp_listener(addr: &SocketAddrV6) -> io::Result std -> tokio UdpSocket::from_std(std::net::UdpSocket::from(socket)) } /// Create a new sending socket, capable of sending IPv6 multicast traffic. -pub fn new_ipv6_multicast_udp_sender(addr: &SocketAddrV6) -> io::Result { - eprintln!("Creating sender"); - let socket = new_ipv6_udp_socket(&addr)?; - // Avoid seeing our own transmissions. - eprintln!("Creating sender - setting multicast loop"); - // XXX set to 'true' for testing - socket.set_multicast_loop_v6(true)?; - - // TODO: Should we pick a specific interface? - socket.set_multicast_if_v6(2)?; - let any_interface_address = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0); - eprintln!("Creating sender - binding"); - socket.bind(&any_interface_address.into())?; - eprintln!("Creating sender - OK"); +fn new_ipv6_udp_sender( + loopback: bool, + interface: u32, +) -> io::Result { + let socket = new_ipv6_udp_socket()?; + socket.set_multicast_loop_v6(loopback)?; + socket.set_multicast_if_v6(interface)?; + let address = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0); + socket.bind(&address.into())?; - // Convert from: socket2 -> std -> tokio UdpSocket::from_std(std::net::UdpSocket::from(socket)) } +/// Returns the (sender, receiver) sockets of an IPv6 UDP multicast group. +/// +/// * `address`: The address to use. Consider a value from: +/// , +/// and the [`Ipv6MulticastScope`] helper to provide the first hextet. +/// * `loopback`: If true, the receiver packet will see multicast packets sent +/// on our sender, in addition to those sent by everyone else in the multicast +/// group. +/// * `interface`: The index of the interface to join (zero indicates "any +/// interface"). +pub fn new_ipv6_udp_pair( + address: &SocketAddrV6, + loopback: bool, + interface: u32, +) -> io::Result<(UdpSocket, UdpSocket)> { + let sender = new_ipv6_udp_sender(loopback, interface)?; + let listener = new_ipv6_udp_listener(address, interface)?; + + Ok((sender, listener)) +} + #[cfg(test)] mod test { use super::*; #[tokio::test] - async fn test_multicast_v6() { + async fn test_multicast_ipv6() { let message = b"Hello World!"; - let scope = Ipv6MulticastScope::InterfaceLocal.first_hextet(); + let scope = Ipv6MulticastScope::LinkLocal.first_hextet(); let address = SocketAddrV6::new( Ipv6Addr::new(scope, 0, 0, 0, 0, 0, 0, 0x1), - 7645, 0, 0 + 7645, + 0, + 0, ); - eprintln!("Address: {}", address); - let sender = new_ipv6_multicast_udp_sender(&address).unwrap(); - let listener = new_ipv6_multicast_udp_listener(&address).unwrap(); - - -// let (len, sender_addr) = listener.recv_from(&mut buf).await.unwrap(); -// assert_eq!(message.len(), sender.send_to(message, address).await.unwrap()); - - let echo_server_handle = tokio::task::spawn(async move { + // For this test, we want to see our own transmission. + // Unlike most usage in the Sled Agent, this means we want + // loopback to be enabled. + let loopback = true; + let interface = 0; + let (sender, listener) = + new_ipv6_udp_pair(&address, loopback, interface).unwrap(); + + // Create a receiver task which reads for messages that have + // been broadcast, verifies the message, and returns the + // calling address. + let receiver_task_handle = tokio::task::spawn(async move { let mut buf = vec![0u8; 32]; let (len, addr) = listener.recv_from(&mut buf).await?; assert_eq!(message.len(), len); - Ok::<_, io::Error>(buf) + assert_eq!(message, &buf[..message.len()]); + Ok::<_, io::Error>(addr) }); - tokio::pin!(echo_server_handle); - + // Send a message repeatedly, and exit successfully if we + // manage to receive the response. + tokio::pin!(receiver_task_handle); let mut send_count = 0; loop { tokio::select! { result = sender.send_to(message, address) => { assert_eq!(message.len(), result.unwrap()); send_count += 1; - if send_count > 50 { - panic!("we sent 50 messages with no response"); + if send_count > 10 { + panic!("10 multicast UDP messages sent with no response"); } tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } - _ = &mut echo_server_handle => { - eprintln!("Receiver received message"); + result = &mut receiver_task_handle => { + let addr = result.unwrap().unwrap(); + eprintln!("Receiver received message: {:#?}", addr); break; } } } - - - } } diff --git a/sled-agent/src/bootstrap/server.rs b/sled-agent/src/bootstrap/server.rs index abb6403a765..27f48f8f8aa 100644 --- a/sled-agent/src/bootstrap/server.rs +++ b/sled-agent/src/bootstrap/server.rs @@ -24,7 +24,8 @@ impl Server { "component" => "Agent", "server" => config.id.clone().to_string() )); - let bootstrap_agent = Arc::new(Agent::new(ba_log)); + let bootstrap_agent = + Arc::new(Agent::new(ba_log).map_err(|e| e.to_string())?); let ba = Arc::clone(&bootstrap_agent); let dropshot_log = log.new(o!("component" => "dropshot")); @@ -43,7 +44,7 @@ impl Server { // This ordering allows the bootstrap agent to communicate with // other bootstrap agents on the rack during the initialization // process. - if let Err(e) = server.bootstrap_agent.initialize(vec![]).await { + if let Err(e) = server.bootstrap_agent.initialize().await { let _ = server.close().await; return Err(e.to_string()); } From acb8793fbd46a32ee8c9de54f46c3b4f187da5a1 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 19 Nov 2021 12:16:42 -0500 Subject: [PATCH 04/11] Clippy --- sled-agent/src/bootstrap/agent.rs | 5 +++++ sled-agent/src/bootstrap/discovery.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index 6fce218c8ef..31c1fc8646b 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -90,6 +90,11 @@ impl Agent { let other_agents = self.peer_monitor.addrs().await; // "-1" to account for ourselves. + // + // NOTE: Clippy error exists while the compile-time unlock + // threshold is "1", because we basically don't require any + // peers to unlock. + #[allow(clippy::absurd_extreme_comparisons)] if other_agents.len() < UNLOCK_THRESHOLD - 1 { return Err(BackoffError::Transient( BootstrapError::NotEnoughPeers, diff --git a/sled-agent/src/bootstrap/discovery.rs b/sled-agent/src/bootstrap/discovery.rs index c348fc48434..df34d12f945 100644 --- a/sled-agent/src/bootstrap/discovery.rs +++ b/sled-agent/src/bootstrap/discovery.rs @@ -84,6 +84,6 @@ impl PeerMonitor { /// /// Note: These sleds have not yet been verified. pub async fn addrs(&self) -> Vec { - self.sleds.lock().await.iter().map(|addr| addr.clone()).collect() + self.sleds.lock().await.iter().map(|addr| *addr).collect() } } From d497b70aec016a0a96daf6f06adff40d6852cc14 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 19 Nov 2021 12:18:20 -0500 Subject: [PATCH 05/11] Comment cleanup --- sled-agent/src/bootstrap/multicast.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index 7b16b83bbc8..b44a6ede16e 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -4,15 +4,6 @@ use std::io; use std::net::{Ipv6Addr, SocketAddrV6}; use tokio::net::UdpSocket; -// NOTE: Use zones? Exclusive netstacks? -// ipdadm can drop packets? -// pstop can stop packets -// NOTE: start w/link local, get address (TCP!) from 'hello' message. -// NOTE: Want to to set up pairs for each link. We'll have -// one for each switch. -// NOTE: "LIF" ioctls to identify devices? -// NOTE: Maghemite uses link-level multicast to discovery neighbors. - /// Scope of an IPv6 Multicast address. /// /// Attempts to align with the unstable [`std::net::Ipv6MulticastScope`] enum. From 26d6fa1071a29f17ef3458fda61fd53d7ecd4d65 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Fri, 19 Nov 2021 12:59:58 -0500 Subject: [PATCH 06/11] mac friendly --- sled-agent/src/bootstrap/multicast.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index b44a6ede16e..ca24801943b 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -47,16 +47,17 @@ impl Ipv6MulticastScope { fn new_ipv6_udp_socket() -> io::Result { let socket = socket2::Socket::new( socket2::Domain::IPV6, - // From - // https://docs.rs/tokio/1.14.0/tokio/net/struct.UdpSocket.html#method.from_std - // - // "It is left up to the user to set it in non-blocking mode". - // - // We (the user) do that here. - socket2::Type::DGRAM.nonblocking(), + socket2::Type::DGRAM, Some(socket2::Protocol::UDP), )?; socket.set_only_v6(true)?; + // From + // https://docs.rs/tokio/1.14.0/tokio/net/struct.UdpSocket.html#method.from_std + // + // "It is left up to the user to set it in non-blocking mode". + // + // We (the user) do that here. + socket.set_nonblocking(true)?; Ok(socket) } From f55ed5722ed194e8d87036c2fabd9fc3ffff88c2 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 29 Nov 2021 10:19:43 -0500 Subject: [PATCH 07/11] Use IPv6 for SMF bootstrap address, format parseable addresses --- sled-agent/src/bootstrap/agent.rs | 27 +++++++++++++++++++++++---- smf/sled-agent/manifest.xml | 2 +- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/sled-agent/src/bootstrap/agent.rs b/sled-agent/src/bootstrap/agent.rs index e024c6b99a8..245fb6e4409 100644 --- a/sled-agent/src/bootstrap/agent.rs +++ b/sled-agent/src/bootstrap/agent.rs @@ -18,7 +18,8 @@ use std::path::Path; use tar::Archive; use thiserror::Error; -const UNLOCK_THRESHOLD: usize = 1; +const UNLOCK_THRESHOLD: usize = 2; +const BOOTSTRAP_PORT: u16 = 12346; /// Describes errors which may occur while operating the bootstrap service. #[derive(Error, Debug)] @@ -88,6 +89,7 @@ impl Agent { internal_service_policy(), || async { let other_agents = self.peer_monitor.addrs().await; + info!(&self.log, "Bootstrap: Communicating with peers: {:?}", other_agents); // "-1" to account for ourselves. // @@ -96,10 +98,12 @@ impl Agent { // peers to unlock. #[allow(clippy::absurd_extreme_comparisons)] if other_agents.len() < UNLOCK_THRESHOLD - 1 { + warn!(&self.log, "Not enough peers to start establishing quorum"); return Err(BackoffError::Transient( BootstrapError::NotEnoughPeers, )); } + info!(&self.log, "Bootstrap: Enough peers to start share transfer"); // TODO-correctness: // - Establish trust quorum. @@ -109,10 +113,23 @@ impl Agent { // agents, but does not actually create a quorum / unlock anything. let other_agents: Vec = other_agents .into_iter() - .map(|addr| { - let addr_str = addr.to_string(); + .map(|mut addr| { + addr.set_port(BOOTSTRAP_PORT); + // TODO-correctness: + // + // Many rust crates - such as "URL" - really dislike + // using scopes in IPv6 addresses. Using + // "addr.to_string()" results in an IP address format + // that is rejected when embedded into a URL. + // + // Instead, we merely use IP and port for the moment, + // which loses the scope information. Longer-term, if we + // use ULAs (Unique Local Addresses) the scope shouldn't + // be a factor anyway. + let addr_str = format!("[{}]:{}", addr.ip(), addr.port()); + info!(&self.log, "bootstrap: Connecting to {}", addr_str); BootstrapClient::new( - &format!("http://{}", addr_str,), + &format!("http://{}", addr_str), self.log.new(o!( "Address" => addr_str, )), @@ -126,8 +143,10 @@ impl Agent { }) .await .map_err(|e| { + info!(&self.log, "Bootstrap: Failed to share request with peer: {:?}", e); BackoffError::Transient(BootstrapError::Api(e)) })?; + info!(&self.log, "Bootstrap: Shared request with peer"); } Ok(()) }, diff --git a/smf/sled-agent/manifest.xml b/smf/sled-agent/manifest.xml index 3c6810f7366..b15661acf4c 100644 --- a/smf/sled-agent/manifest.xml +++ b/smf/sled-agent/manifest.xml @@ -41,7 +41,7 @@ - + From 0aa0a035f62cf9c3e8763a29e697450c2dad1847 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 29 Nov 2021 10:55:28 -0500 Subject: [PATCH 08/11] Copyright --- sled-agent/src/bootstrap/discovery.rs | 4 ++++ sled-agent/src/bootstrap/multicast.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/sled-agent/src/bootstrap/discovery.rs b/sled-agent/src/bootstrap/discovery.rs index df34d12f945..c1cb9117b89 100644 --- a/sled-agent/src/bootstrap/discovery.rs +++ b/sled-agent/src/bootstrap/discovery.rs @@ -1,3 +1,7 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + //! Sled announcement and discovery. use super::multicast; diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index ca24801943b..629dc986a25 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -1,3 +1,7 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + //! Ipv6 Multicast utilities used for Sled discovery. use std::io; From 8817ce01bca6b4d5f61b3156e843b85bd59453fa Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 29 Nov 2021 11:00:10 -0500 Subject: [PATCH 09/11] not mac --- sled-agent/src/bootstrap/multicast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index 629dc986a25..c610e31b7b6 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -129,7 +129,7 @@ pub fn new_ipv6_udp_pair( Ok((sender, listener)) } -#[cfg(test)] +#[cfg(all(test, not(target_os = "macos")))] mod test { use super::*; From e7f6ba9af1e5c67fb6372b92a039cbdc7317bf7f Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 29 Nov 2021 11:40:39 -0500 Subject: [PATCH 10/11] Ignore IPv6 test by default, with comment --- sled-agent/src/bootstrap/multicast.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index c610e31b7b6..0819fa36cc5 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -129,11 +129,12 @@ pub fn new_ipv6_udp_pair( Ok((sender, listener)) } -#[cfg(all(test, not(target_os = "macos")))] +#[cfg(test)] mod test { use super::*; #[tokio::test] + #[ignore] async fn test_multicast_ipv6() { let message = b"Hello World!"; let scope = Ipv6MulticastScope::LinkLocal.first_hextet(); From 0156df22a4a8ddb9d2bf874ff69e9b012be09c21 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 29 Nov 2021 11:48:31 -0500 Subject: [PATCH 11/11] Add comment --- sled-agent/src/bootstrap/multicast.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sled-agent/src/bootstrap/multicast.rs b/sled-agent/src/bootstrap/multicast.rs index 0819fa36cc5..78a611ff978 100644 --- a/sled-agent/src/bootstrap/multicast.rs +++ b/sled-agent/src/bootstrap/multicast.rs @@ -133,6 +133,11 @@ pub fn new_ipv6_udp_pair( mod test { use super::*; + // NOTE: This test is ignored by default - it relies on a networking + // setup that isn't consistent between our automated test infrastructure. + // It can still be run locally with: + // + // $ cargo test -p omicron-sled-agent -- --ignored #[tokio::test] #[ignore] async fn test_multicast_ipv6() {