diff --git a/Cargo.lock b/Cargo.lock index 2e3406717ac..67fbaa41d50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4298,6 +4298,7 @@ dependencies = [ "anyhow", "async-trait", "dropshot", + "futures", "gateway-messages", "hex", "omicron-common", diff --git a/gateway-messages/src/lib.rs b/gateway-messages/src/lib.rs index 2ef04e16151..b46722020c6 100644 --- a/gateway-messages/src/lib.rs +++ b/gateway-messages/src/lib.rs @@ -8,8 +8,12 @@ pub mod sp_impl; mod variable_packet; use bitflags::bitflags; -use core::{fmt, str}; -use serde::{Deserialize, Serialize}; +use core::fmt; +use core::str; +use serde::Deserialize; +use serde::Serialize; +use serde_repr::Deserialize_repr; +use serde_repr::Serialize_repr; pub use hubpack::error::Error as HubpackError; pub use hubpack::{deserialize, serialize, SerializedSize}; @@ -41,6 +45,16 @@ pub enum RequestKind { SerialConsoleWrite(SerialConsole), } +/// Identifier for one of of an SP's KSZ8463 management-network-facing ports. +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize_repr, Deserialize_repr, +)] +#[repr(u8)] +pub enum SpPort { + One = 1, + Two = 2, +} + // TODO: Not all SPs are capable of crafting all these response kinds, but the // way we're using hubpack requires everyone to allocate Response::MAX_SIZE. Is // that okay, or should we break this up more? diff --git a/gateway-messages/src/sp_impl.rs b/gateway-messages/src/sp_impl.rs index 3b91a7c3eb6..7565aeb20d6 100644 --- a/gateway-messages/src/sp_impl.rs +++ b/gateway-messages/src/sp_impl.rs @@ -4,38 +4,64 @@ //! Behavior implemented by both real and simulated SPs. -use crate::{ - version, BulkIgnitionState, IgnitionCommand, IgnitionState, Request, - RequestKind, ResponseError, ResponseKind, SerialConsole, SpComponent, - SpMessage, SpMessageKind, SpState, -}; +use crate::version; +use crate::BulkIgnitionState; +use crate::IgnitionCommand; +use crate::IgnitionState; +use crate::Request; +use crate::RequestKind; +use crate::ResponseError; +use crate::ResponseKind; +use crate::SerialConsole; +use crate::SpComponent; +use crate::SpMessage; +use crate::SpMessageKind; +use crate::SpPort; +use crate::SpState; use hubpack::SerializedSize; +use std::net::SocketAddr; pub trait SpHandler { - fn ping(&mut self) -> Result<(), ResponseError>; + fn ping( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result<(), ResponseError>; fn ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, ) -> Result; fn bulk_ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, ) -> Result; fn ignition_command( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, command: IgnitionCommand, ) -> Result<(), ResponseError>; - fn sp_state(&mut self) -> Result; + fn sp_state( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result; // TODO Should we return "number of bytes written" here, or is it sufficient // to say "all or none"? Would be nice for the caller to not have to resend // UDP chunks; can SP ensure it writes all data locally? fn serial_console_write( &mut self, + sender: SocketAddr, + port: SpPort, packet: SerialConsole, ) -> Result<(), ResponseError>; } @@ -148,12 +174,14 @@ impl Default for SpServer { impl SpServer { /// Handler for incoming UDP requests. /// - /// `data` should be a UDP packet that has arrived for the current SP. It - /// will be parsed (into a [`Request`]), the appropriate method will be - /// called on `handler`, and a serialized response will - /// be returned, which the caller should send back to the requester. + /// `data` should be a UDP packet that has arrived from `sender` on `port`. + /// It will be parsed (into a [`Request`]), the appropriate method will be + /// called on `handler`, and a serialized response will be returned, which + /// the caller should send back to the requester. pub fn dispatch( &mut self, + sender: SocketAddr, + port: SpPort, data: &[u8], handler: &mut H, ) -> Result<&[u8], Error> { @@ -174,21 +202,23 @@ impl SpServer { // call out to handler to provide response let result = match request.kind { - RequestKind::Ping => handler.ping().map(|()| ResponseKind::Pong), - RequestKind::IgnitionState { target } => { - handler.ignition_state(target).map(ResponseKind::IgnitionState) + RequestKind::Ping => { + handler.ping(sender, port).map(|()| ResponseKind::Pong) } + RequestKind::IgnitionState { target } => handler + .ignition_state(sender, port, target) + .map(ResponseKind::IgnitionState), RequestKind::BulkIgnitionState => handler - .bulk_ignition_state() + .bulk_ignition_state(sender, port) .map(ResponseKind::BulkIgnitionState), RequestKind::IgnitionCommand { target, command } => handler - .ignition_command(target, command) + .ignition_command(sender, port, target, command) .map(|()| ResponseKind::IgnitionCommandAck), RequestKind::SpState => { - handler.sp_state().map(ResponseKind::SpState) + handler.sp_state(sender, port).map(ResponseKind::SpState) } RequestKind::SerialConsoleWrite(packet) => handler - .serial_console_write(packet) + .serial_console_write(sender, port, packet) .map(|()| ResponseKind::SerialConsoleWriteAck), }; diff --git a/gateway/examples/config.toml b/gateway/examples/config.toml index 2add9d938df..8922a21fa33 100644 --- a/gateway/examples/config.toml +++ b/gateway/examples/config.toml @@ -8,11 +8,11 @@ id = "8afcb12d-f625-4df9-bdf2-f495c3bbd323" [known_sps] switches = [ # first switch is assumed to be the local ignition controller - { sp = "127.0.0.1:23456", switch_port = "127.0.0.1:33456" }, + { sp = "[::1]:33300", switch_port = "[::1]:33200" }, ] sleds = [ - { sp = "127.0.0.1:23457", switch_port = "127.0.0.1:33457" }, - { sp = "127.0.0.1:23458", switch_port = "127.0.0.1:33458" }, + { sp = "[::1]:33310", switch_port = "[::1]:33201" }, + { sp = "[::1]:33320", switch_port = "[::1]:33202" }, ] power_controllers = [ ] diff --git a/gateway/tests/integration_tests/setup.rs b/gateway/tests/integration_tests/setup.rs index ed37bcd41ad..2486ffd1b28 100644 --- a/gateway/tests/integration_tests/setup.rs +++ b/gateway/tests/integration_tests/setup.rs @@ -69,16 +69,16 @@ pub async fn test_setup_with_config( .sidecars .iter() .map(|simsp| KnownSp { - sp: simsp.local_addr(), - switch_port: "127.0.0.1:0".parse().unwrap(), + sp: simsp.local_addr(0), + switch_port: "[::1]:0".parse().unwrap(), }) .collect::>(); let gimlets = simrack .gimlets .iter() .map(|simsp| KnownSp { - sp: simsp.local_addr(), - switch_port: "127.0.0.1:0".parse().unwrap(), + sp: simsp.local_addr(0), + switch_port: "[::1]:0".parse().unwrap(), }) .collect::>(); server_config.known_sps = KnownSps { diff --git a/gateway/tests/sp_sim_config.test.toml b/gateway/tests/sp_sim_config.test.toml index 1096e892bfb..42a089d7d52 100644 --- a/gateway/tests/sp_sim_config.test.toml +++ b/gateway/tests/sp_sim_config.test.toml @@ -8,22 +8,25 @@ # concurrently. # [[simulated_sps.sidecar]] -bind_address = "127.0.0.1:0" +multicast_addr = "::1" +bind_addrs = ["[::1]:0", "[::1]:0"] serial_number = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] [[simulated_sps.gimlet]] -bind_address = "127.0.0.1:0" +multicast_addr = "::1" +bind_addrs = ["[::1]:0", "[::1]:0"] serial_number = [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3] [[simulated_sps.gimlet.components]] name = "sp3" -serial_console = "127.0.0.1:0" +serial_console = "[::1]:0" [[simulated_sps.gimlet]] -bind_address = "127.0.0.1:0" +multicast_addr = "::1" +bind_addrs = ["[::1]:0", "[::1]:0"] serial_number = [4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 7] [[simulated_sps.gimlet.components]] name = "sp3" -serial_console = "127.0.0.1:0" +serial_console = "[::1]:0" # # NOTE: for the test suite, the [log] section is ignored; sp-sim logs are rolled diff --git a/sp-sim/Cargo.toml b/sp-sim/Cargo.toml index 7ad4d9d1e5e..8e56f9f5196 100644 --- a/sp-sim/Cargo.toml +++ b/sp-sim/Cargo.toml @@ -8,6 +8,7 @@ license = "MPL-2.0" anyhow = "1.0" async-trait = "0.1.53" dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } +futures = "0.3" gateway-messages = { path = "../gateway-messages" } hex = "0.4.3" omicron-common = { path = "../common" } diff --git a/sp-sim/examples/config.toml b/sp-sim/examples/config.toml index 3ceea1bf8b5..43cec485e8f 100644 --- a/sp-sim/examples/config.toml +++ b/sp-sim/examples/config.toml @@ -3,22 +3,25 @@ # [[simulated_sps.sidecar]] -bind_address = "127.0.0.1:23456" +multicast_addr = "ff15:0:1de::0" +bind_addrs = ["[::1]:33300", "[::1]:33301"] serial_number = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] [[simulated_sps.gimlet]] -bind_address = "127.0.0.1:23457" +multicast_addr = "ff15:0:1de::1" +bind_addrs = ["[::1]:33310", "[::1]:33311"] serial_number = [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3] [[simulated_sps.gimlet.components]] name = "sp3" -serial_console = "127.0.0.1:33457" +serial_console = "[::1]:33312" [[simulated_sps.gimlet]] -bind_address = "127.0.0.1:23458" +multicast_addr = "ff15:0:1de::2" +bind_addrs = ["[::1]:33320", "[::1]:33321"] serial_number = [4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 7] [[simulated_sps.gimlet.components]] name = "sp3" -serial_console = "127.0.0.1:33458" +serial_console = "[::1]:33322" [log] # Show log messages of this level and more severe diff --git a/sp-sim/src/config.rs b/sp-sim/src/config.rs index 1a38a782b3c..30f64272ad0 100644 --- a/sp-sim/src/config.rs +++ b/sp-sim/src/config.rs @@ -7,18 +7,21 @@ use dropshot::ConfigLogging; use gateway_messages::SerialNumber; -use serde::{Deserialize, Serialize}; -use std::{ - net::SocketAddr, - path::{Path, PathBuf}, -}; +use serde::Deserialize; +use serde::Serialize; +use std::net::Ipv6Addr; +use std::net::SocketAddr; +use std::path::Path; +use std::path::PathBuf; use thiserror::Error; /// Configuration of a simulated sidecar SP #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct SidecarConfig { - /// UDP address - pub bind_address: SocketAddr, + /// IPv6 multicast address to join. + pub multicast_addr: Ipv6Addr, + /// UDP address of the two (fake) KSZ8463 ports + pub bind_addrs: [SocketAddr; 2], /// Fake serial number pub serial_number: SerialNumber, } @@ -26,8 +29,10 @@ pub struct SidecarConfig { /// Configuration of a simulated gimlet SP #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct GimletConfig { - /// UDP address - pub bind_address: SocketAddr, + /// IPv6 multicast address to join. + pub multicast_addr: Ipv6Addr, + /// UDP address of the two (fake) KSZ8463 ports + pub bind_addrs: [SocketAddr; 2], /// Fake serial number pub serial_number: SerialNumber, /// Attached components @@ -60,12 +65,6 @@ pub struct Config { pub simulated_sps: SimulatedSps, /// Server-wide logging configuration. pub log: ConfigLogging, - // Type of SP to simulate. - // pub sp_type: SpType, - // Components to simulate. - // pub components: SpComponents, - // UDP listen address. - // pub bind_address: SocketAddr, } impl Config { diff --git a/sp-sim/src/gimlet.rs b/sp-sim/src/gimlet.rs index 781cc29a132..c6f9a68ab6c 100644 --- a/sp-sim/src/gimlet.rs +++ b/sp-sim/src/gimlet.rs @@ -3,15 +3,23 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::config::GimletConfig; +use crate::server; use crate::server::UdpServer; use crate::{Responsiveness, SimulatedSp}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; +use futures::future; use gateway_messages::sp_impl::{SerialConsolePacketizer, SpHandler, SpServer}; -use gateway_messages::{ - version, ResponseError, SerialConsole, SerialNumber, SerializedSize, - SpComponent, SpMessage, SpMessageKind, SpState, -}; +use gateway_messages::version; +use gateway_messages::ResponseError; +use gateway_messages::SerialConsole; +use gateway_messages::SerialNumber; +use gateway_messages::SerializedSize; +use gateway_messages::SpComponent; +use gateway_messages::SpMessage; +use gateway_messages::SpMessageKind; +use gateway_messages::SpPort; +use gateway_messages::SpState; use slog::{debug, error, info, warn, Logger}; use std::collections::HashMap; use std::net::SocketAddr; @@ -24,7 +32,7 @@ use tokio::sync::oneshot; use tokio::task::{self, JoinHandle}; pub struct Gimlet { - local_addr: SocketAddr, + local_addrs: [SocketAddr; 2], serial_number: SerialNumber, serial_console_addrs: HashMap, commands: @@ -47,8 +55,8 @@ impl SimulatedSp for Gimlet { hex::encode(self.serial_number) } - fn local_addr(&self) -> SocketAddr { - self.local_addr + fn local_addr(&self, port: usize) -> SocketAddr { + self.local_addrs[port] } async fn set_responsiveness(&self, r: Responsiveness) { @@ -65,23 +73,29 @@ impl Gimlet { pub async fn spawn(gimlet: &GimletConfig, log: Logger) -> Result { info!(log, "setting up simualted gimlet"); - let server = UdpServer::new(gimlet.bind_address).await?; - let sock = Arc::clone(server.socket()); - let local_addr = sock - .local_addr() - .with_context(|| "could not get local address of bound socket")?; + // bind to our two local "KSZ" ports + assert_eq!(gimlet.bind_addrs.len(), 2); // gimlet SP always has 2 ports + let servers = future::try_join( + UdpServer::new(gimlet.bind_addrs[0], gimlet.multicast_addr, &log), + UdpServer::new(gimlet.bind_addrs[1], gimlet.multicast_addr, &log), + ) + .await?; + let servers = [servers.0, servers.1]; + let local_addrs = [servers[0].local_addr(), servers[1].local_addr()]; let mut incoming_console_tx = HashMap::new(); let mut inner_tasks = Vec::new(); - // We want to be able to start without knowing the gateway's socket - // address, but we're spawning both the primary UDP task (which receives - // messages from the gateway) and a helper TCP task (which emulates a - // serial console and sends messages to the gateway unprompted). We'll - // share a locked `Option` between the tasks, and have the - // UDP task populate it. If the TCP task receives data but doesn't know - // the gateways address, it will just discard it. - let gateway_address: Arc>> = Arc::default(); + // We want to be able to start without knowing the gateways' socket + // addresses, but we're spawning both the primary UDP task (which + // receives messages from the gateway) and a helper TCP task (which + // emulates a serial console and sends messages to the gateway + // unprompted). We'll share a locked `Option` between the + // tasks, and have the UDP task populate it. If the TCP task receives + // data but doesn't know either of the gateways addresses, it will just + // discard it. + let gateway_addresses: Arc; 2]>> = + Arc::default(); let mut serial_console_addrs = HashMap::new(); @@ -94,6 +108,12 @@ impl Gimlet { let listener = TcpListener::bind(addr) .await .with_context(|| format!("failed to bind to {}", addr))?; + debug!( + log, "bound fake serial console to TCP port"; + "addr" => addr, + "component" => ?component, + ); + serial_console_addrs.insert( component.as_str().unwrap().to_string(), listener.local_addr().with_context(|| { @@ -108,8 +128,11 @@ impl Gimlet { component, listener, rx, - Arc::clone(&sock), - Arc::clone(&gateway_address), + [ + Arc::clone(servers[0].socket()), + Arc::clone(servers[1].socket()), + ], + Arc::clone(&gateway_addresses), log.new(slog::o!("serial-console" => name.to_string())), ); inner_tasks.push(task::spawn(async move { @@ -120,8 +143,8 @@ impl Gimlet { let (commands, commands_rx) = mpsc::unbounded_channel(); let inner = UdpTask::new( - server, - gateway_address, + servers, + gateway_addresses, gimlet.serial_number, incoming_console_tx, commands_rx, @@ -131,7 +154,7 @@ impl Gimlet { .push(task::spawn(async move { inner.run().await.unwrap() })); Ok(Self { - local_addr, + local_addrs, serial_number: gimlet.serial_number, serial_console_addrs, commands, @@ -147,8 +170,8 @@ impl Gimlet { struct SerialConsoleTcpTask { listener: TcpListener, incoming_serial_console: UnboundedReceiver, - sock: Arc, - gateway_address: Arc>>, + socks: [Arc; 2], + gateway_addresses: Arc; 2]>>, console_packetizer: SerialConsolePacketizer, log: Logger, } @@ -158,42 +181,61 @@ impl SerialConsoleTcpTask { component: SpComponent, listener: TcpListener, incoming_serial_console: UnboundedReceiver, - sock: Arc, - gateway_address: Arc>>, + socks: [Arc; 2], + gateway_addresses: Arc; 2]>>, log: Logger, ) -> Self { Self { listener, incoming_serial_console, - sock, - gateway_address, + socks, + gateway_addresses, console_packetizer: SerialConsolePacketizer::new(component), log, } } async fn send_serial_console(&mut self, mut data: &[u8]) -> Result<()> { - let gateway_address = self.gateway_address.lock().unwrap().ok_or_else(|| anyhow!("serial console task does not know gateway's UDP address (yet?)"))?; + let gateway_addrs = *self.gateway_addresses.lock().unwrap(); + for (i, (sock, &gateway_addr)) in + self.socks.iter().zip(&gateway_addrs).enumerate() + { + let gateway_addr = match gateway_addr { + Some(addr) => addr, + None => { + info!( + self.log, + concat!( + "MGS address on port {} not known - ", + "not sending it serial console data", + ), + i, + ); + continue; + } + }; - // if we're told to send something starting with "SKIP ", emulate a - // dropped packet spanning 10 bytes before sending the rest of the data. - if let Some(remaining) = data.strip_prefix(b"SKIP ") { - self.console_packetizer.danger_emulate_dropped_packets(10); - data = remaining; - } + // if we're told to send something starting with "SKIP ", emulate a + // dropped packet spanning 10 bytes before sending the rest of the + // data. + if let Some(remaining) = data.strip_prefix(b"SKIP ") { + self.console_packetizer.danger_emulate_dropped_packets(10); + data = remaining; + } - let mut out = [0; SpMessage::MAX_SIZE]; - for packet in self.console_packetizer.packetize(data) { - let message = SpMessage { - version: version::V1, - kind: SpMessageKind::SerialConsole(packet), - }; + let mut out = [0; SpMessage::MAX_SIZE]; + for packet in self.console_packetizer.packetize(data) { + let message = SpMessage { + version: version::V1, + kind: SpMessageKind::SerialConsole(packet), + }; - // We know `out` is big enough for any `SpMessage`, so no need to - // bubble up an error here. - let n = - gateway_messages::serialize(&mut out[..], &message).unwrap(); - self.sock.send_to(&out[..n], gateway_address).await?; + // We know `out` is big enough for any `SpMessage`, so no need + // to bubble up an error here. + let n = gateway_messages::serialize(&mut out[..], &message) + .unwrap(); + sock.send_to(&out[..n], gateway_addr).await?; + } } Ok(()) @@ -287,8 +329,8 @@ enum CommandResponse { } struct UdpTask { - udp: UdpServer, - gateway_address: Arc>>, + udp0: UdpServer, + udp1: UdpServer, handler: Handler, commands: mpsc::UnboundedReceiver<(Command, oneshot::Sender)>, @@ -296,8 +338,8 @@ struct UdpTask { impl UdpTask { fn new( - server: UdpServer, - gateway_address: Arc>>, + servers: [UdpServer; 2], + gateway_addresses: Arc; 2]>>, serial_number: SerialNumber, incoming_serial_console: HashMap< SpComponent, @@ -309,10 +351,16 @@ impl UdpTask { )>, log: Logger, ) -> Self { + let [udp0, udp1] = servers; Self { - udp: server, - gateway_address, - handler: Handler { log, serial_number, incoming_serial_console }, + udp0, + udp1, + handler: Handler { + log, + gateway_addresses, + serial_number, + incoming_serial_console, + }, commands, } } @@ -322,26 +370,28 @@ impl UdpTask { let mut responsiveness = Responsiveness::Responsive; loop { select! { - recv = self.udp.recv_from() => { - if responsiveness != Responsiveness::Responsive { - continue; + recv0 = self.udp0.recv_from() => { + if let Some((resp, addr)) = server::handle_request( + &mut self.handler, + recv0, + &mut server, + responsiveness, + SpPort::One, + ).await? { + self.udp0.send_to(resp, addr).await?; } + } - let (data, addr) = recv?; - *self.gateway_address.lock().unwrap() = Some(addr); - - let resp = match server.dispatch(data, &mut self.handler) { - Ok(resp) => resp, - Err(err) => { - error!( - self.handler.log, - "dispatching message failed: {:?}", err, - ); - continue; - } - }; - - self.udp.send_to(resp, addr).await?; + recv1 = self.udp1.recv_from() => { + if let Some((resp, addr)) = server::handle_request( + &mut self.handler, + recv1, + &mut server, + responsiveness, + SpPort::Two, + ).await? { + self.udp1.send_to(resp, addr).await?; + } } command = self.commands.recv() => { @@ -367,62 +417,102 @@ impl UdpTask { struct Handler { log: Logger, serial_number: SerialNumber, + gateway_addresses: Arc; 2]>>, incoming_serial_console: HashMap>, } +impl Handler { + fn update_gateway_address(&self, addr: SocketAddr, port: SpPort) { + let i = match port { + SpPort::One => 0, + SpPort::Two => 1, + }; + self.gateway_addresses.lock().unwrap()[i] = Some(addr); + } +} + impl SpHandler for Handler { - fn ping(&mut self) -> Result<(), ResponseError> { - debug!(&self.log, "received ping; sending pong"); + fn ping( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result<(), ResponseError> { + self.update_gateway_address(sender, port); + debug!( + &self.log, "received ping; sending pong"; + "sender" => sender, + "port" => ?port, + ); Ok(()) } fn ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, ) -> Result { + self.update_gateway_address(sender, port); warn!( &self.log, - "received ignition state request for {}; not supported by gimlet", - target, + "received ignition state request; not supported by gimlet"; + "sender" => sender, + "port" => ?port, + "target" => target, ); Err(ResponseError::RequestUnsupportedForSp) } fn bulk_ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, ) -> Result { + self.update_gateway_address(sender, port); warn!( &self.log, - "received bulk ignition state request; not supported by gimlet", + "received bulk ignition state request; not supported by gimlet"; + "sender" => sender, + "port" => ?port, ); Err(ResponseError::RequestUnsupportedForSp) } fn ignition_command( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, command: gateway_messages::IgnitionCommand, ) -> Result<(), ResponseError> { + self.update_gateway_address(sender, port); warn!( &self.log, - "received ignition command {:?} for target {}; not supported by gimlet", - command, - target + "received ignition command; not supported by gimlet"; + "sender" => sender, + "port" => ?port, + "target" => target, + "command" => ?command, ); Err(ResponseError::RequestUnsupportedForSp) } fn serial_console_write( &mut self, + sender: SocketAddr, + port: SpPort, packet: gateway_messages::SerialConsole, ) -> Result<(), ResponseError> { + self.update_gateway_address(sender, port); debug!( &self.log, - "received serial console packet with {} bytes at offset {} for component {:?}", - packet.len, - packet.offset, - packet.component, + "received serial console packet"; + "sender" => sender, + "port" => ?port, + "len" => packet.len, + "offset" => packet.offset, + "component" => ?packet.component, ); let incoming_serial_console = self @@ -440,9 +530,19 @@ impl SpHandler for Handler { Ok(()) } - fn sp_state(&mut self) -> Result { + fn sp_state( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result { + self.update_gateway_address(sender, port); let state = SpState { serial_number: self.serial_number }; - debug!(&self.log, "received state request; sending {:?}", state); + debug!( + &self.log, "received state request"; + "sender" => sender, + "port" => ?port, + "reply-state" => ?state, + ); Ok(state) } } diff --git a/sp-sim/src/lib.rs b/sp-sim/src/lib.rs index 085ab60f45c..c754f1c3cc2 100644 --- a/sp-sim/src/lib.rs +++ b/sp-sim/src/lib.rs @@ -31,8 +31,8 @@ pub enum Responsiveness { pub trait SimulatedSp { /// Hexlified serial number. fn serial_number(&self) -> String; - /// Listening UDP address of the simulated SP. - fn local_addr(&self) -> SocketAddr; + /// Listening UDP address of the given port of this simulated SP. + fn local_addr(&self, port: usize) -> SocketAddr; /// Simulate the SP being unresponsive, in which it ignores all incoming /// messages. async fn set_responsiveness(&self, r: Responsiveness); diff --git a/sp-sim/src/server.rs b/sp-sim/src/server.rs index ee1dae403f9..b6f64afcefa 100644 --- a/sp-sim/src/server.rs +++ b/sp-sim/src/server.rs @@ -3,32 +3,71 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::config::Config; -use anyhow::{bail, Context, Result}; -use gateway_messages::{Request, SerializedSize}; -use slog::{debug, error, Logger}; -use std::{net::SocketAddr, sync::Arc}; +use crate::Responsiveness; +use anyhow::anyhow; +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use gateway_messages::sp_impl::SpHandler; +use gateway_messages::sp_impl::SpServer; +use gateway_messages::Request; +use gateway_messages::SerializedSize; +use gateway_messages::SpPort; +use slog::debug; +use slog::error; +use slog::Logger; +use std::net::Ipv6Addr; +use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::UdpSocket; /// Thin wrapper pairing a [`UdpSocket`] with a buffer sized for [`Request`]s. pub(crate) struct UdpServer { sock: Arc, + local_addr: SocketAddr, buf: [u8; Request::MAX_SIZE], } impl UdpServer { - pub(crate) async fn new(bind_address: SocketAddr) -> Result { + pub(crate) async fn new( + bind_address: SocketAddr, + multicast_addr: Ipv6Addr, + log: &Logger, + ) -> Result { let sock = Arc::new(UdpSocket::bind(bind_address).await.with_context( || format!("failed to bind to {}", bind_address), )?); - Ok(Self { sock, buf: [0; Request::MAX_SIZE] }) + // In some environments where sp-sim runs (e.g., some CI runners), + // we're not able to join ipv6 multicast groups. In those cases, we're + // configured with a "multicast_addr" that isn't actually multicast, so + // don't try to join the group if we have such an address. + if multicast_addr.is_multicast() { + sock.join_multicast_v6(&multicast_addr, 0).with_context(|| { + format!("failed to join multicast group {}", multicast_addr) + })?; + } + + let local_addr = sock + .local_addr() + .with_context(|| "failed to get local address of bound socket")?; + debug!(log, "UDP socket bound"; + "local_addr" => %local_addr, + "multicast_addr" => %multicast_addr, + ); + + Ok(Self { sock, local_addr, buf: [0; Request::MAX_SIZE] }) } pub(crate) fn socket(&self) -> &Arc { &self.sock } + pub(crate) fn local_addr(&self) -> SocketAddr { + self.local_addr + } + pub(crate) async fn recv_from(&mut self) -> Result<(&[u8], SocketAddr)> { let (len, addr) = self .sock @@ -61,3 +100,28 @@ pub fn logger(config: &Config) -> Result { } Ok(log) } + +pub(crate) async fn handle_request<'a, H: SpHandler>( + handler: &mut H, + recv: Result<(&[u8], SocketAddr)>, + server: &'a mut SpServer, + responsiveness: Responsiveness, + port_num: SpPort, +) -> Result> { + match responsiveness { + Responsiveness::Responsive => (), // proceed + Responsiveness::Unresponsive => { + // pretend to be unresponsive - drop this packet + return Ok(None); + } + } + + let (data, addr) = + recv.with_context(|| format!("recv on {:?}", port_num))?; + + let resp = server + .dispatch(addr, port_num, data, handler) + .map_err(|err| anyhow!("dispatching message failed: {:?}", err))?; + + Ok(Some((resp, addr))) +} diff --git a/sp-sim/src/sidecar.rs b/sp-sim/src/sidecar.rs index 3f0b7b2f339..8eb189cfc7d 100644 --- a/sp-sim/src/sidecar.rs +++ b/sp-sim/src/sidecar.rs @@ -4,25 +4,38 @@ use std::net::SocketAddr; -use crate::config::{Config, SidecarConfig}; +use crate::config::Config; +use crate::config::SidecarConfig; +use crate::ignition_id; +use crate::server; use crate::server::UdpServer; -use crate::{ignition_id, Responsiveness, SimulatedSp}; -use anyhow::{Context, Result}; +use crate::Responsiveness; +use crate::SimulatedSp; +use anyhow::Result; use async_trait::async_trait; -use gateway_messages::sp_impl::{SpHandler, SpServer}; -use gateway_messages::{ - BulkIgnitionState, IgnitionCommand, IgnitionFlags, IgnitionState, - ResponseError, SerialNumber, SpState, -}; -use slog::{debug, error, info, warn, Logger}; -use tokio::sync::{mpsc, oneshot}; -use tokio::{ - select, - task::{self, JoinHandle}, -}; +use futures::future; +use gateway_messages::sp_impl::SpHandler; +use gateway_messages::sp_impl::SpServer; +use gateway_messages::BulkIgnitionState; +use gateway_messages::IgnitionCommand; +use gateway_messages::IgnitionFlags; +use gateway_messages::IgnitionState; +use gateway_messages::ResponseError; +use gateway_messages::SerialNumber; +use gateway_messages::SpPort; +use gateway_messages::SpState; +use slog::debug; +use slog::info; +use slog::warn; +use slog::Logger; +use tokio::select; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::task; +use tokio::task::JoinHandle; pub struct Sidecar { - local_addr: SocketAddr, + local_addrs: [SocketAddr; 2], serial_number: SerialNumber, commands: mpsc::UnboundedSender<(Command, oneshot::Sender)>, @@ -42,8 +55,8 @@ impl SimulatedSp for Sidecar { hex::encode(self.serial_number) } - fn local_addr(&self) -> SocketAddr { - self.local_addr + fn local_addr(&self, port: usize) -> SocketAddr { + self.local_addrs[port] } async fn set_responsiveness(&self, r: Responsiveness) { @@ -59,15 +72,19 @@ impl SimulatedSp for Sidecar { impl Sidecar { pub async fn spawn( config: &Config, - sidecar_config: &SidecarConfig, + sidecar: &SidecarConfig, log: Logger, ) -> Result { info!(log, "setting up simualted sidecar"); - let server = UdpServer::new(sidecar_config.bind_address).await?; - let local_addr = server - .socket() - .local_addr() - .with_context(|| "could not get local address of bound socket")?; + // bind to our two local "KSZ" ports + assert_eq!(sidecar.bind_addrs.len(), 2); + let servers = future::try_join( + UdpServer::new(sidecar.bind_addrs[0], sidecar.multicast_addr, &log), + UdpServer::new(sidecar.bind_addrs[1], sidecar.multicast_addr, &log), + ) + .await?; + let servers = [servers.0, servers.1]; + let local_addrs = [servers[0].local_addr(), servers[1].local_addr()]; let mut ignition_targets = Vec::new(); for _ in &config.simulated_sps.sidecar { @@ -85,16 +102,16 @@ impl Sidecar { let (commands, commands_rx) = mpsc::unbounded_channel(); let inner = Inner::new( - server, - sidecar_config.serial_number, + servers, + sidecar.serial_number, ignition_targets, commands_rx, log, ); let inner_task = task::spawn(async move { inner.run().await.unwrap() }); Ok(Self { - local_addr, - serial_number: sidecar_config.serial_number, + local_addrs, + serial_number: sidecar.serial_number, commands, inner_task, }) @@ -127,14 +144,15 @@ enum CommandResponse { struct Inner { handler: Handler, - udp: UdpServer, + udp0: UdpServer, + udp1: UdpServer, commands: mpsc::UnboundedReceiver<(Command, oneshot::Sender)>, } impl Inner { fn new( - server: UdpServer, + servers: [UdpServer; 2], serial_number: SerialNumber, ignition_targets: Vec, commands: mpsc::UnboundedReceiver<( @@ -143,9 +161,11 @@ impl Inner { )>, log: Logger, ) -> Self { + let [udp0, udp1] = servers; Self { handler: Handler { log, serial_number, ignition_targets }, - udp: server, + udp0, + udp1, commands, } } @@ -155,25 +175,28 @@ impl Inner { let mut responsiveness = Responsiveness::Responsive; loop { select! { - recv = self.udp.recv_from() => { - if responsiveness != Responsiveness::Responsive { - continue; + recv0 = self.udp0.recv_from() => { + if let Some((resp, addr)) = server::handle_request( + &mut self.handler, + recv0, + &mut server, + responsiveness, + SpPort::One, + ).await? { + self.udp0.send_to(resp, addr).await?; } + } - let (data, addr) = recv?; - - let resp = match server.dispatch(data, &mut self.handler) { - Ok(resp) => resp, - Err(err) => { - error!( - self.handler.log, - "dispatching message failed: {:?}", err, - ); - continue; - } - }; - - self.udp.send_to(resp, addr).await?; + recv1 = self.udp1.recv_from() => { + if let Some((resp, addr)) = server::handle_request( + &mut self.handler, + recv1, + &mut server, + responsiveness, + SpPort::Two, + ).await? { + self.udp1.send_to(resp, addr).await?; + } } command = self.commands.recv() => { @@ -225,27 +248,41 @@ impl Handler { } impl SpHandler for Handler { - fn ping(&mut self) -> Result<(), ResponseError> { - debug!(&self.log, "received ping; sending pong"); + fn ping( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result<(), ResponseError> { + debug!( + &self.log, "received ping; sending pong"; + "sender" => sender, + "port" => ?port, + ); Ok(()) } fn ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, ) -> Result { let state = self.get_target(target)?; debug!( &self.log, - "received ignition state request for {}; sending {:?}", - target, - state + "received ignition state request"; + "sender" => sender, + "port" => ?port, + "target" => target, + "reply-state" => ?state, ); Ok(*state) } fn bulk_ignition_state( &mut self, + sender: SocketAddr, + port: SpPort, ) -> Result { let num_targets = self.ignition_targets.len(); assert!( @@ -263,13 +300,17 @@ impl SpHandler for Handler { debug!( &self.log, "received bulk ignition state request; sending state for {} targets", - num_targets, + num_targets; + "sender" => sender, + "port" => ?port, ); Ok(out) } fn ignition_command( &mut self, + sender: SocketAddr, + port: SpPort, target: u8, command: IgnitionCommand, ) -> Result<(), ResponseError> { @@ -285,24 +326,41 @@ impl SpHandler for Handler { debug!( &self.log, - "received ignition command {:?} for target {}; sending ack", - command, - target + "received ignition command; sending ack"; + "sender" => sender, + "port" => ?port, + "target" => target, + "command" => ?command, ); Ok(()) } fn serial_console_write( &mut self, + sender: SocketAddr, + port: SpPort, _packet: gateway_messages::SerialConsole, ) -> Result<(), ResponseError> { - warn!(&self.log, "received request to write to serial console (unsupported on sidecar)"); + warn!( + &self.log, "received serial console write; unsupported by sidecar"; + "sender" => sender, + "port" => ?port, + ); Err(ResponseError::RequestUnsupportedForSp) } - fn sp_state(&mut self) -> Result { + fn sp_state( + &mut self, + sender: SocketAddr, + port: SpPort, + ) -> Result { let state = SpState { serial_number: self.serial_number }; - debug!(&self.log, "received state request; sending {:?}", state); + debug!( + &self.log, "received state request"; + "sender" => sender, + "port" => ?port, + "reply-state" => ?state, + ); Ok(state) } }