diff --git a/Cargo.lock b/Cargo.lock index 67fbaa41d50..0ef80cb68d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1444,6 +1444,7 @@ dependencies = [ "slog", "thiserror", "tokio", + "tokio-stream", "tokio-tungstenite", "usdt", "uuid", @@ -4773,6 +4774,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.17.1" diff --git a/gateway-messages/src/lib.rs b/gateway-messages/src/lib.rs index b46722020c6..1229e68220c 100644 --- a/gateway-messages/src/lib.rs +++ b/gateway-messages/src/lib.rs @@ -35,7 +35,7 @@ pub struct Request { #[derive(Debug, Clone, SerializedSize, Serialize, Deserialize)] pub enum RequestKind { - Ping, + Discover, // TODO do we want to be able to request IgnitionState for all targets in // one message? IgnitionState { target: u8 }, @@ -47,7 +47,15 @@ pub enum RequestKind { /// Identifier for one of of an SP's KSZ8463 management-network-facing ports. #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize_repr, Deserialize_repr, + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + Serialize_repr, + Deserialize_repr, + SerializedSize, )] #[repr(u8)] pub enum SpPort { @@ -60,7 +68,7 @@ pub enum SpPort { // that okay, or should we break this up more? #[derive(Debug, Clone, SerializedSize, Serialize, Deserialize)] pub enum ResponseKind { - Pong, + Discover(DiscoverResponse), IgnitionState(IgnitionState), BulkIgnitionState(BulkIgnitionState), IgnitionCommandAck, @@ -68,6 +76,12 @@ pub enum ResponseKind { SerialConsoleWriteAck, } +#[derive(Debug, Clone, Copy, SerializedSize, Serialize, Deserialize)] +pub struct DiscoverResponse { + /// Which SP port received the `Discover` request. + pub sp_port: SpPort, +} + // TODO how is this reported? Same/different for components? pub type SerialNumber = [u8; 16]; diff --git a/gateway-messages/src/sp_impl.rs b/gateway-messages/src/sp_impl.rs index 7565aeb20d6..2c2cbc49956 100644 --- a/gateway-messages/src/sp_impl.rs +++ b/gateway-messages/src/sp_impl.rs @@ -6,6 +6,7 @@ use crate::version; use crate::BulkIgnitionState; +use crate::DiscoverResponse; use crate::IgnitionCommand; use crate::IgnitionState; use crate::Request; @@ -22,11 +23,11 @@ use hubpack::SerializedSize; use std::net::SocketAddr; pub trait SpHandler { - fn ping( + fn discover( &mut self, sender: SocketAddr, port: SpPort, - ) -> Result<(), ResponseError>; + ) -> Result; fn ignition_state( &mut self, @@ -202,8 +203,8 @@ impl SpServer { // call out to handler to provide response let result = match request.kind { - RequestKind::Ping => { - handler.ping(sender, port).map(|()| ResponseKind::Pong) + RequestKind::Discover => { + handler.discover(sender, port).map(ResponseKind::Discover) } RequestKind::IgnitionState { target } => handler .ignition_state(sender, port, target) diff --git a/gateway-sp-comms/Cargo.toml b/gateway-sp-comms/Cargo.toml index 7fca3f51fd9..899a31c544b 100644 --- a/gateway-sp-comms/Cargo.toml +++ b/gateway-sp-comms/Cargo.toml @@ -12,6 +12,7 @@ ringbuffer = "0.8" serde = { version = "1.0", features = ["derive"] } thiserror = "1.0.30" tokio-tungstenite = "0.17" +tokio-stream = "0.1.8" usdt = "0.3.1" uuid = "0.8" diff --git a/gateway-sp-comms/src/communicator.rs b/gateway-sp-comms/src/communicator.rs index b3021e72411..07932e3058b 100644 --- a/gateway-sp-comms/src/communicator.rs +++ b/gateway-sp-comms/src/communicator.rs @@ -22,6 +22,7 @@ use futures::Future; use futures::Stream; use gateway_messages::version; use gateway_messages::BulkIgnitionState; +use gateway_messages::DiscoverResponse; use gateway_messages::IgnitionCommand; use gateway_messages::IgnitionState; use gateway_messages::Request; @@ -113,13 +114,14 @@ impl Communicator { let request = RequestKind::IgnitionState { target: port.as_ignition_target() }; - self.request_response( - &controller, - request, - Some(timeout), - ResponseKindExt::try_into_ignition_state, - ) - .await + Ok(self + .request_response( + &controller, + request, + ResponseKindExt::try_into_ignition_state, + Some(timeout), + ) + .await?) } /// Ask the local ignition controller for the ignition state of all SPs. @@ -134,8 +136,8 @@ impl Communicator { .request_response( &controller, request, - Some(timeout), ResponseKindExt::try_into_bulk_ignition_state, + Some(timeout), ) .await?; @@ -172,13 +174,14 @@ impl Communicator { let target = self.id_to_port(target_sp)?.as_ignition_target(); let request = RequestKind::IgnitionCommand { target, command }; - self.request_response( - &controller, - request, - Some(timeout), - ResponseKindExt::try_into_ignition_command_ack, - ) - .await + Ok(self + .request_response( + &controller, + request, + ResponseKindExt::try_into_ignition_command_ack, + Some(timeout), + ) + .await?) } /// Set up a websocket connection that forwards data to and from the given @@ -298,13 +301,14 @@ impl Communicator { let sp = self.switch.sp_socket(port).expect("lost address of attached SP"); - self.request_response( - &sp, - RequestKind::SerialConsoleWrite(packet), - Some(timeout), - ResponseKindExt::try_into_serial_console_write_ack, - ) - .await + Ok(self + .request_response( + &sp, + RequestKind::SerialConsoleWrite(packet), + ResponseKindExt::try_into_serial_console_write_ack, + Some(timeout), + ) + .await?) } /// Get the state of a given SP. @@ -340,13 +344,14 @@ impl Communicator { self.switch.sp_socket(port).ok_or(Error::SpAddressUnknown(sp))?; let request = RequestKind::SpState; - self.request_response( - &sp, - request, - timeout, - ResponseKindExt::try_into_sp_state, - ) - .await + Ok(self + .request_response( + &sp, + request, + ResponseKindExt::try_into_sp_state, + timeout, + ) + .await?) } /// Query all online SPs. @@ -393,12 +398,12 @@ impl Communicator { .collect::>() } - async fn request_response( + pub(crate) async fn request_response( &self, sp: &SpSocket<'_>, mut kind: RequestKind, - timeout: Option, mut map_response_kind: F, + timeout: Option, ) -> Result where F: FnMut(ResponseKind) -> Result, @@ -494,9 +499,11 @@ impl Communicator { // When we send a request we expect a specific kind of response; the boilerplate // for confirming that is a little noisy, so it lives in this extension trait. -trait ResponseKindExt { +pub(crate) trait ResponseKindExt { fn name(&self) -> &'static str; + fn try_into_discover(self) -> Result; + fn try_into_ignition_state(self) -> Result; fn try_into_bulk_ignition_state( @@ -513,7 +520,7 @@ trait ResponseKindExt { impl ResponseKindExt for ResponseKind { fn name(&self) -> &'static str { match self { - ResponseKind::Pong => response_kind_names::PONG, + ResponseKind::Discover(_) => response_kind_names::DISCOVER, ResponseKind::IgnitionState(_) => { response_kind_names::IGNITION_STATE } @@ -530,6 +537,16 @@ impl ResponseKindExt for ResponseKind { } } + fn try_into_discover(self) -> Result { + match self { + ResponseKind::Discover(discover) => Ok(discover), + other => Err(BadResponseType { + expected: response_kind_names::DISCOVER, + got: other.name(), + }), + } + } + fn try_into_ignition_state(self) -> Result { match self { ResponseKind::IgnitionState(state) => Ok(state), @@ -584,7 +601,7 @@ impl ResponseKindExt for ResponseKind { } mod response_kind_names { - pub(super) const PONG: &str = "pong"; + pub(super) const DISCOVER: &str = "discover"; pub(super) const IGNITION_STATE: &str = "ignition_state"; pub(super) const BULK_IGNITION_STATE: &str = "bulk_ignition_state"; pub(super) const IGNITION_COMMAND_ACK: &str = "ignition_command_ack"; diff --git a/gateway-sp-comms/src/error.rs b/gateway-sp-comms/src/error.rs index 03f70ea4340..ff6c8ad7b78 100644 --- a/gateway-sp-comms/src/error.rs +++ b/gateway-sp-comms/src/error.rs @@ -15,12 +15,20 @@ use thiserror::Error; pub enum StartupError { #[error("error binding to UDP address {addr}: {err}")] UdpBind { addr: SocketAddr, err: io::Error }, + #[error("invalid configuration file: {}", .reasons.join(", "))] + InvalidConfig { reasons: Vec }, + #[error("error communicating with SP: {0}")] + SpCommunicationFailed(#[from] SpCommunicationError), + #[error("location discovery failed: {reason}")] + DiscoveryFailed { reason: String }, } #[derive(Debug, Error)] pub enum Error { #[error("nonexistent SP (type {:?}, slot {})", .0.typ, .0.slot)] SpDoesNotExist(SpIdentifier), + #[error("unknown socket address for local ignition controller")] + LocalIgnitionControllerAddressUnknown, #[error( "unknown socket address for SP (type {:?}, slot {})", .0.typ, diff --git a/gateway-sp-comms/src/management_switch.rs b/gateway-sp-comms/src/management_switch.rs index 3fb07fa71cb..474c3358ed2 100644 --- a/gateway-sp-comms/src/management_switch.rs +++ b/gateway-sp-comms/src/management_switch.rs @@ -11,6 +11,9 @@ //! See RFD 250 for details. //! +#[allow(dead_code)] // we don't use this yet, but will shortly +mod location_map; + use crate::error::StartupError; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -48,13 +51,20 @@ pub struct KnownSps { pub power_controllers: Vec, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] pub struct SpIdentifier { pub typ: SpType, pub slot: usize, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +impl SpIdentifier { + pub fn new(typ: SpType, slot: usize) -> Self { + Self { typ, slot } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum SpType { Switch, Sled, diff --git a/gateway-sp-comms/src/management_switch/location_map.rs b/gateway-sp-comms/src/management_switch/location_map.rs new file mode 100644 index 00000000000..11977d16e12 --- /dev/null +++ b/gateway-sp-comms/src/management_switch/location_map.rs @@ -0,0 +1,731 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2022 Oxide Computer Company + +use super::SpIdentifier; +use super::SpSocket; +use super::SwitchPort; +use crate::communicator::ResponseKindExt; +use crate::error::StartupError; +use crate::recv_handler::RecvHandler; +use crate::Timeout; +use futures::stream::FuturesUnordered; +use futures::Stream; +use futures::StreamExt; +use gateway_messages::RequestKind; +use gateway_messages::SpPort; +use omicron_common::backoff; +use omicron_common::backoff::Backoff; +use serde::Deserialize; +use serde::Serialize; +use slog::debug; +use slog::info; +use slog::Logger; +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::UdpSocket; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tokio_stream::wrappers::ReceiverStream; + +/// Configuration of a single port of the management network switch. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct SwitchPortConfig { + /// Data link addresses; this is the address on which we should bind a + /// socket, which will be tagged with the appropriate VLAN for this switch + /// port (see RFD 250). + pub data_link_addr: SocketAddr, + + /// Multicast address used to find the SP connected to this port. + // TODO: The multicast address used should be a single address, not a + // per-port address. For now we configure it per-port to make dev/test on a + // single system easier; we can run multiple simulated SPs that all listen + // to different multicast addresses on one host. + pub multicast_addr: SocketAddr, + + /// Map defining the logical identifier of the SP connected to this port for + /// each of the possible locations where MGS is running (see + /// [`LocationConfig::names`]). + pub location: HashMap, +} + +/// Configure the topology of the rack where MGS is running, and describe how we +/// can determine our own location. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct LocationConfig { + /// List of human-readable location names; the actual strings don't matter, + /// but they're used in log messages and to sync with the refined locations + /// contained in `determination`. For "rack v1" see RFD 250 ยง 7.2. + pub names: Vec, + + /// A list of switch ports that can be used to determine which location (of + /// those listed in `names`) we are. + pub determination: Vec, +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct LocationDeterminationConfig { + /// Which port to contact. + pub switch_port: usize, + + /// If the SP on `switch_port` communicates with us on its port 1, we must + /// be one of this set of locations (which should be a subset of our parent + /// [`LocationConfig`]'s `names). + pub sp_port_1: Vec, + + /// If the SP on `switch_port` communicates with us on its port 2, we must + /// be one of this set of locations (which should be a subset of our parent + /// [`LocationConfig`]'s `names). + pub sp_port_2: Vec, +} + +#[derive(Debug)] +pub(super) struct LocationMap { + location_name: String, + port_to_id: HashMap, + id_to_port: HashMap, +} + +impl LocationMap { + // For unit tests we don't want to have to run discovery, so allow + // construction of a canned `LocationMap`. + #[cfg(test)] + pub(super) fn new_raw( + location_name: String, + port_to_id: HashMap, + ) -> Self { + let mut id_to_port = HashMap::with_capacity(port_to_id.len()); + for (&port, &id) in port_to_id.iter() { + id_to_port.insert(id, port); + } + Self { location_name, port_to_id, id_to_port } + } + + pub(super) async fn run_discovery( + communicator: Arc, + config: LocationConfig, + ports: HashMap, + sockets: Arc>, + recv_handler: Arc, + deadline: Instant, + log: &Logger, + ) -> Result { + // Validate that `config` is valid. + let config = ValidatedLocationConfig::try_from((&ports, config))?; + + // Spawn a task that will send discovery packets on every switch port + // until it hears back from all SPs with exponential backoff to avoid + // slamming the network; we expect some ports to never resolve (e.g., if + // the cubby at the other end is not populated and therefore there is no + // SP to respond). + let location_determination = config.determination; + let (refined_locations_tx, refined_locations) = mpsc::channel(8); + { + let log = log.clone(); + let ports = ports.clone(); + tokio::spawn(async move { + discover_sps( + &communicator, + &sockets, + ports, + &recv_handler, + location_determination, + refined_locations_tx, + &log, + ) + .await; + }); + } + + // Collect responses and solve for a single location + let location = match tokio::time::timeout_at( + deadline, + resolve_location( + config.names, + ReceiverStream::new(refined_locations), + log, + ), + ) + .await + { + Ok(Ok(location)) => location, + Ok(Err(err)) => return Err(err), + Err(_) => { + return Err(StartupError::DiscoveryFailed { + reason: String::from("timeout"), + }) + } + }; + + // based on the resolved location and the input configuration, build the + // map of port <-> logical ID + let mut port_to_id = HashMap::with_capacity(ports.len()); + let mut id_to_port = HashMap::with_capacity(ports.len()); + for (port, mut port_config) in ports { + // construction of `ValidatedLocationConfig` checked that all port + // configs have entries for all locations, allowing us to unwrap + // this `remove()`. + let id = port_config.location.remove(&location).unwrap(); + port_to_id.insert(port, id); + id_to_port.insert(id, port); + } + + Ok(Self { location_name: location, port_to_id, id_to_port }) + } + + /// Get the name of our location. + /// + /// This matches one of the names specified as a possible location in the + /// configuration we were given. + pub(super) fn location_name(&self) -> &str { + &self.location_name + } + + /// Get the ID of a given port. + /// + /// Panics if `port` was not present in the list of ports provided when + /// `self` was created. + pub(super) fn port_to_id(&self, port: SwitchPort) -> SpIdentifier { + self.port_to_id.get(&port).copied().unwrap() + } + + /// Get the port associated with the given ID, if it exists. + pub(super) fn id_to_port(&self, id: SpIdentifier) -> Option { + self.id_to_port.get(&id).copied() + } +} + +// This repeats the fields of `LocationConfig` but +// +// a) converts `Vec<_>` to `HashSet<_>` (we care about ordering for TOML +// serialization, but really want set operations) +// b) validates that all the fields that reference each other are self +// consistent; e.g., there isn't a LocationDeterminationConfig that refers to +// a nonexistent switch port. +#[derive(Debug, PartialEq)] +struct ValidatedLocationConfig { + names: HashSet, + determination: Vec, +} + +#[derive(Debug, PartialEq)] +struct ValidatedLocationDeterminationConfig { + switch_port: SwitchPort, + sp_port_1: HashSet, + sp_port_2: HashSet, +} + +impl TryFrom<(&'_ HashMap, LocationConfig)> + for ValidatedLocationConfig +{ + type Error = StartupError; + + fn try_from( + (ports, config): ( + &HashMap, + LocationConfig, + ), + ) -> Result { + // Helper function to convert a vec into a hashset, recording an error + // string in `reasons` if the lengths don't match (i.e., the vec + // contained at least one duplicate) + fn vec_to_hashset( + v: Vec, + reasons: &mut Vec, + msg: F, + ) -> HashSet + where + F: FnOnce() -> String, + { + let n = v.len(); + let hs = v.into_iter().collect::>(); + if hs.len() != n { + reasons.push(msg()); + } + hs + } + + // collection of reasons the config is invalid (if any) + let mut reasons = Vec::new(); + + let names = vec_to_hashset(config.names, &mut reasons, || { + String::from("location names contains at least one duplicate entry") + }); + + // make sure every port has a defined ID for any element of `names`, and + // no extras + for (port, port_config) in ports { + for name in &names { + if !port_config.location.contains_key(name) { + reasons.push(format!( + "port {} is missing an ID for location {:?}", + port.0, name + )); + } + } + for name in port_config.location.keys() { + if !names.contains(name) { + reasons.push(format!( + "port {} contains unknown name {:?}", + port.0, name + )); + } + } + } + + let determination = config + .determination + .into_iter() + .enumerate() + .map(|(i, det)| { + // make sure this determination's switch port exists + let switch_port = SwitchPort(det.switch_port); + if !ports.contains_key(&switch_port) { + reasons.push(format!( + "determination {} references a nonexistent switch port ({})", + i, det.switch_port + )); + } + + // convert names into hash sets + let sp_port_1 = vec_to_hashset(det.sp_port_1, &mut reasons, || + format!( + "determination `{}.sp_port_1` contains duplicate names", + i + ) + ); + let sp_port_2 = vec_to_hashset(det.sp_port_2, &mut reasons, || + format!( + "determination `{}.sp_port_2` contains duplicate names", + i + ) + ); + + // make sure these hash sets only reference known names + if !sp_port_1.is_subset(&names) { + reasons.push(format!( + "determination `{}.sp_port_1` contains unknown names", + i + )); + } + if !sp_port_2.is_subset(&names) { + reasons.push(format!( + "determination `{}.sp_port_2` contains unknown names", + i + )); + } + + // determinations should not be empty; that would result in + // immediate failure of our location resolution + if sp_port_1.is_empty() { + reasons.push(format!( + "determination `{}.sp_port_1` is empty", + i + )); + } + if sp_port_2.is_empty() { + reasons.push(format!( + "determination `{}.sp_port_2` is empty", + i + )); + } + + ValidatedLocationDeterminationConfig { + switch_port, + sp_port_1, + sp_port_2, + } + }) + .collect::>(); + + if reasons.is_empty() { + Ok(Self { names, determination }) + } else { + Err(StartupError::InvalidConfig { reasons }) + } + } +} + +/// `discovery_sps()` is spawned as a tokio task. It's responsible for sending +/// discovery packets on all switch ports until it hears back from the SPs at +/// the other end (if they exist and are able to respond). Any responses we hear +/// back that correspond to a port used in `location_determination` will result +/// in a message being send on the `refined_locations` channel with that port +/// and the list of locations we could be in based on the SP's response on that +/// port. Our spawner is responsible for collecting/using those messages. +async fn discover_sps( + communicator: &crate::Communicator, + sockets: &HashMap, + port_config: HashMap, + _recv_handler: &RecvHandler, + mut location_determination: Vec, + refined_locations: mpsc::Sender<(SwitchPort, HashSet)>, + log: &Logger, +) { + // Build a collection of futures that sends discovery packets on every port; + // each future runs until it hears back from an SP (possibly running forever + // if there is no SP listening on the other end of that port's connection). + let mut futs = FuturesUnordered::new(); + for (port, config) in port_config { + futs.push(async move { + // construct a socket pointed to a multicast addr instead of a + // specific, known addr + let socket = SpSocket { + port, + addr: config.multicast_addr, + // all ports in `port_config` also get sockets bound to them; + // unwrapping this lookup is fine + socket: sockets.get(&port).unwrap(), + }; + + let mut backoff = backoff::internal_service_policy(); + loop { + let duration = backoff + .next_backoff() + .expect("internal backoff policy gave up"); + tokio::time::sleep(duration).await; + + let result = communicator + .request_response( + &socket, + RequestKind::Discover, + ResponseKindExt::try_into_discover, + // TODO should this timeout be configurable or itself + // have some kind of backoff? we're inside a + // `backoff::retry()` loop, but if an SP is alive but + // slow (i.e., taking longer than this timeout to reply) + // we'll never hear it - the response will show up late + // and we'll ignore it. For now just leave it at some + // reasonably large number; this may solve itself when + // we move to some kind of authenticated comms channel. + Some(Timeout::from_now(Duration::from_secs(5))), + ) + .await; + + match result { + Ok(response) => return (port, response), + Err(err) => { + debug!( + log, "discovery failed; will retry"; + "port" => ?port, + "err" => %err, + ); + } + } + } + }); + } + + // Wait for responses. + while let Some((port, response)) = futs.next().await { + // See if this port can participate in location determination. + let pos = match location_determination + .iter() + .position(|d| d.switch_port == port) + { + Some(pos) => pos, + None => { + info!( + log, "received discovery response (not used for location)"; + "port" => ?port, + "response" => ?response, + ); + continue; + } + }; + let determination = location_determination.remove(pos); + + let refined = match response.sp_port { + SpPort::One => determination.sp_port_1, + SpPort::Two => determination.sp_port_2, + }; + + // the only failure possible here is that the receiver is gone; that's + // harmless for us (e.g., maybe it's already fully determined the + // location and doesn't care about more messages) + let _ = refined_locations.send((port, refined)).await; + } + + // TODO If we're exiting, we've now heard from an SP on every port. Is there + // any reason to continue pinging ports with discovery packets? This is TBD + // on how we handle sleds being power cycled, replaced, etc. +} + +/// Given a list of possible location names (`locations`) and a stream of +/// `determinations` (coming from `discover_sps()` above), resolve which element +/// of `locations` we must be. For example, if `locations` contains `["switch0", +/// "switch1"]` and we receive a determination of `(SwitchPort(1), +/// ["switch0"])`, we'll return `Ok("switch0")`. This process can fail if we get +/// bogus/conflicting determinations or if we exhaust `determinations` without +/// refining to a single location. +/// +/// Note that not all bogus/conflicting results will be detected, because this +/// function will short circuit once it has resolved to a single location. For +/// example, if `locations` contains `["a", "b"]` and `determinations` will +/// yield `[(SwitchPort(0), "a"), (SwitchPort(1), "b")]`, we will return +/// `Ok("a")` upon seeing the first determination without noticing the second. +/// On the other hand, if `names` contains `["a", "b", "c"]` and +/// `determinations` will yield `[(SwitchPort(0), ["a", "b"]), (SwitchPort(1), +/// ["c"])]`, we would notice the empty intersection and fail accordingly. +async fn resolve_location( + mut locations: HashSet, + determinations: S, + log: &Logger, +) -> Result +where + S: Stream)>, +{ + tokio::pin!(determinations); + while let Some((port, refined_locations)) = determinations.next().await { + // we got a successful response from an SP; restrict `locations` to only + // locations that could be possible given that response. + debug!( + log, "received location deterimination response"; + "port" => ?port, + "refined_locations" => ?refined_locations, + ); + locations.retain(|name| refined_locations.contains(name)); + + // If we're down to 1 location (or 0 if something has gone horribly + // wrong), we don't need to wait for the remaining answers; we've + // already fully determined our location. + if locations.len() <= 1 { + break; + } + } + + match locations.len() { + 1 => Ok(locations.into_iter().next().unwrap()), + 0 => Err(StartupError::DiscoveryFailed { + reason: String::from(concat!( + "could not determine unique location ", + "(all possible locations eliminated)", + )), + }), + _ => { + let mut remaining = locations.into_iter().collect::>(); + remaining.sort_unstable(); + Err(StartupError::DiscoveryFailed { + reason: format!( + "could not determine unique location (remaining set `{:?}`)", + remaining, + ), + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::SpType; + use futures::stream; + use std::vec; + + #[test] + fn test_config_validation() { + let bad_ports = HashMap::from([( + SwitchPort(0), + SwitchPortConfig { + data_link_addr: "127.0.0.1:0".parse().unwrap(), + multicast_addr: "127.0.0.1:0".parse().unwrap(), + location: HashMap::from([ + (String::from("a"), SpIdentifier::new(SpType::Sled, 0)), + // missing "b", has extraneous "c" + (String::from("c"), SpIdentifier::new(SpType::Sled, 1)), + ]), + }, + )]); + let bad_config = LocationConfig { + names: vec![ + String::from("a"), + String::from("b"), + String::from("a"), // dupe + ], + determination: vec![ + LocationDeterminationConfig { + switch_port: 7, // nonexistent port + sp_port_1: vec![ + String::from("a"), + String::from("b"), + String::from("a"), // dupe + String::from("c"), // not listed in `names` + ], + sp_port_2: vec![], // empty + }, + LocationDeterminationConfig { + switch_port: 0, + sp_port_1: vec![], // empty + sp_port_2: vec![ + String::from("a"), + String::from("b"), + String::from("b"), // dupe + String::from("d"), // not listed in `names` + ], + }, + ], + }; + + let err = ValidatedLocationConfig::try_from((&bad_ports, bad_config)) + .unwrap_err(); + let reasons = match err { + StartupError::InvalidConfig { reasons } => reasons, + other => panic!("unexpected error {}", other), + }; + + assert_eq!( + reasons, + &[ + "location names contains at least one duplicate entry", + "port 0 is missing an ID for location \"b\"", + "port 0 contains unknown name \"c\"", + "determination 0 references a nonexistent switch port (7)", + "determination `0.sp_port_1` contains duplicate names", + "determination `0.sp_port_1` contains unknown names", + "determination `0.sp_port_2` is empty", + "determination `1.sp_port_2` contains duplicate names", + "determination `1.sp_port_2` contains unknown names", + "determination `1.sp_port_1` is empty", + ] + ); + + // repeat the config from above but with all errors fixed; note that + // this config is still logically nonsense, but it doesn't contain any + // of the errors we check for (mismatched / typo'd names, etc.). + let good_ports = HashMap::from([( + SwitchPort(0), + SwitchPortConfig { + data_link_addr: "127.0.0.1:0".parse().unwrap(), + multicast_addr: "127.0.0.1:0".parse().unwrap(), + location: HashMap::from([ + (String::from("a"), SpIdentifier::new(SpType::Sled, 0)), + (String::from("b"), SpIdentifier::new(SpType::Sled, 1)), + ]), + }, + )]); + let good_config = LocationConfig { + names: vec![String::from("a"), String::from("b")], + determination: vec![ + LocationDeterminationConfig { + switch_port: 0, + sp_port_1: vec![String::from("a")], + sp_port_2: vec![String::from("b")], + }, + LocationDeterminationConfig { + switch_port: 0, + sp_port_1: vec![String::from("b")], + sp_port_2: vec![String::from("a")], + }, + ], + }; + + let config = + ValidatedLocationConfig::try_from((&good_ports, good_config)) + .unwrap(); + + assert_eq!( + config, + ValidatedLocationConfig { + names: HashSet::from([String::from("a"), String::from("b")]), + determination: vec![ + ValidatedLocationDeterminationConfig { + switch_port: SwitchPort(0), + sp_port_1: HashSet::from([String::from("a")]), + sp_port_2: HashSet::from([String::from("b")]), + }, + ValidatedLocationDeterminationConfig { + switch_port: SwitchPort(0), + sp_port_1: HashSet::from([String::from("b")]), + sp_port_2: HashSet::from([String::from("a")]), + }, + ], + } + ); + } + + struct Harness { + names: HashSet, + determinations: + stream::Iter)>>, + } + + impl Harness { + fn new(names: &[&str], determinations: &[&[&str]]) -> Self { + let determinations: Vec<(SwitchPort, HashSet)> = + determinations + .iter() + .enumerate() + .map(|(i, names)| { + ( + SwitchPort(i), + names.iter().copied().map(String::from).collect(), + ) + }) + .collect(); + Self { + names: names.iter().copied().map(String::from).collect(), + determinations: stream::iter(determinations), + } + } + + async fn resolve(self, log: &Logger) -> Result { + resolve_location(self.names, self.determinations, log).await + } + } + + #[tokio::test] + async fn test_resolve_location() { + let log = Logger::root(slog::Discard, slog::o!()); + + // basic test - of a/b/c, all determinations report "a" + let harness = Harness::new(&["a", "b", "c"], &[&["a"], &["a"], &["a"]]); + assert_eq!(harness.resolve(&log).await.unwrap(), "a"); + + // slightly more interesting - of a/b/c, all determinations report a + // subset, all of which intersect on "b" + let harness = Harness::new( + &["a", "b", "c"], + &[&["a", "b"], &["a", "b", "c"], &["b", "c"]], + ); + assert_eq!(harness.resolve(&log).await.unwrap(), "b"); + + // failing to resolve to a single location should give us an error + let harness = + Harness::new(&["a", "b", "c"], &[&["a", "b"], &["b", "a"]]); + let err = harness.resolve(&log).await.unwrap_err(); + match err { + StartupError::DiscoveryFailed { reason } => { + assert_eq!( + reason, + concat!( + "could not determine unique location ", + "(remaining set `[\"a\", \"b\"]`)", + ) + ); + } + _ => panic!("unexpected error {}", err), + } + + // determinations that have no name in common should give us an error + let harness = Harness::new(&["a", "b", "c"], &[&["a", "b"], &["c"]]); + let err = harness.resolve(&log).await.unwrap_err(); + match err { + StartupError::DiscoveryFailed { reason } => { + assert_eq!( + reason, + concat!( + "could not determine unique location ", + "(all possible locations eliminated)", + ) + ); + } + _ => panic!("unexpected error {}", err), + } + } +} diff --git a/gateway/src/error.rs b/gateway/src/error.rs index 3a8cf4649c6..201c000f1f1 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -57,6 +57,7 @@ where ), SpCommsError::SpAddressUnknown(_) | SpCommsError::Timeout { .. } + | SpCommsError::LocalIgnitionControllerAddressUnknown | SpCommsError::SpCommunicationFailed(_) => { HttpError::for_internal_error(err.to_string()) } diff --git a/gateway/src/http_entrypoints.rs b/gateway/src/http_entrypoints.rs index dc3171afc2d..eaa949d4712 100644 --- a/gateway/src/http_entrypoints.rs +++ b/gateway/src/http_entrypoints.rs @@ -318,6 +318,7 @@ async fn sp_list( // need more refined errors? SpCommsError::Timeout { .. } | SpCommsError::SpCommunicationFailed(_) + | SpCommsError::LocalIgnitionControllerAddressUnknown | SpCommsError::SpAddressUnknown(_) => { SpState::Unresponsive } diff --git a/sp-sim/src/gimlet.rs b/sp-sim/src/gimlet.rs index c6f9a68ab6c..6ad5f2b06b6 100644 --- a/sp-sim/src/gimlet.rs +++ b/sp-sim/src/gimlet.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use futures::future; use gateway_messages::sp_impl::{SerialConsolePacketizer, SpHandler, SpServer}; use gateway_messages::version; +use gateway_messages::DiscoverResponse; use gateway_messages::ResponseError; use gateway_messages::SerialConsole; use gateway_messages::SerialNumber; @@ -433,18 +434,18 @@ impl Handler { } impl SpHandler for Handler { - fn ping( + fn discover( &mut self, sender: SocketAddr, port: SpPort, - ) -> Result<(), ResponseError> { - self.update_gateway_address(sender, port); + ) -> Result { debug!( - &self.log, "received ping; sending pong"; + &self.log, + "received discover; sending response"; "sender" => sender, "port" => ?port, ); - Ok(()) + Ok(DiscoverResponse { sp_port: port }) } fn ignition_state( diff --git a/sp-sim/src/sidecar.rs b/sp-sim/src/sidecar.rs index 8eb189cfc7d..83d2cd89aa8 100644 --- a/sp-sim/src/sidecar.rs +++ b/sp-sim/src/sidecar.rs @@ -17,6 +17,7 @@ use futures::future; use gateway_messages::sp_impl::SpHandler; use gateway_messages::sp_impl::SpServer; use gateway_messages::BulkIgnitionState; +use gateway_messages::DiscoverResponse; use gateway_messages::IgnitionCommand; use gateway_messages::IgnitionFlags; use gateway_messages::IgnitionState; @@ -248,17 +249,18 @@ impl Handler { } impl SpHandler for Handler { - fn ping( + fn discover( &mut self, sender: SocketAddr, port: SpPort, - ) -> Result<(), ResponseError> { + ) -> Result { debug!( - &self.log, "received ping; sending pong"; + &self.log, + "received discover; sending response"; "sender" => sender, "port" => ?port, ); - Ok(()) + Ok(DiscoverResponse { sp_port: port }) } fn ignition_state(