diff --git a/gateway-sp-comms/src/communicator.rs b/gateway-sp-comms/src/communicator.rs index ade37077d37..b3021e72411 100644 --- a/gateway-sp-comms/src/communicator.rs +++ b/gateway-sp-comms/src/communicator.rs @@ -13,8 +13,10 @@ use crate::management_switch::ManagementSwitchDiscovery; use crate::management_switch::SpSocket; use crate::management_switch::SwitchPort; use crate::recv_handler::RecvHandler; +use crate::Elapsed; use crate::KnownSps; use crate::SpIdentifier; +use crate::Timeout; use futures::stream::FuturesUnordered; use futures::Future; use futures::Stream; @@ -43,7 +45,6 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use tokio::time::Instant; use tokio_tungstenite::tungstenite::handshake; /// Helper trait that allows us to return an `impl FuturesUnordered<_>` where @@ -105,7 +106,7 @@ impl Communicator { pub async fn get_ignition_state( &self, sp: SpIdentifier, - timeout: Instant, + timeout: Timeout, ) -> Result { let controller = self.switch.ignition_controller(); let port = self.id_to_port(sp)?; @@ -124,7 +125,7 @@ impl Communicator { /// Ask the local ignition controller for the ignition state of all SPs. pub async fn get_ignition_state_all( &self, - timeout: Instant, + timeout: Timeout, ) -> Result, Error> { let controller = self.switch.ignition_controller(); let request = RequestKind::BulkIgnitionState; @@ -165,7 +166,7 @@ impl Communicator { &self, target_sp: SpIdentifier, command: IgnitionCommand, - timeout: Instant, + timeout: Timeout, ) -> Result<(), Error> { let controller = self.switch.ignition_controller(); let target = self.id_to_port(target_sp)?.as_ignition_target(); @@ -285,7 +286,7 @@ impl Communicator { &self, port: SwitchPort, packet: SerialConsole, - timeout: Instant, + timeout: Timeout, ) -> Result<(), Error> { // We can only send to an SP's serial console if we've attached to it, // which means we know its address. @@ -310,7 +311,7 @@ impl Communicator { pub async fn get_state( &self, sp: SpIdentifier, - timeout: Instant, + timeout: Timeout, ) -> Result { self.get_state_maybe_timeout(sp, Some(timeout)).await } @@ -318,7 +319,7 @@ impl Communicator { /// Get the state of a given SP without a timeout; it is the caller's /// responsibility to ensure a reasonable timeout is applied higher up in /// the chain. - // TODO we could have one method that takes `Option` for a timeout, + // TODO we could have one method that takes `Option` for a timeout, // and/or apply that to _all_ the methods in this class. I don't want to // make it easy to accidentally call a method without providing a timeout, // though, so went with the current design. @@ -332,7 +333,7 @@ impl Communicator { async fn get_state_maybe_timeout( &self, sp: SpIdentifier, - timeout: Option, + timeout: Option, ) -> Result { let port = self.id_to_port(sp)?; let sp = @@ -366,14 +367,10 @@ impl Communicator { pub fn query_all_online_sps( &self, ignition_state: &[(SpIdentifier, IgnitionState)], - timeout: Instant, + timeout: Timeout, f: F, ) -> impl FuturesUnorderedImpl< - Item = ( - SpIdentifier, - IgnitionState, - Option>, - ), + Item = (SpIdentifier, IgnitionState, Option>), > where F: FnMut(SpIdentifier) -> Fut + Clone, @@ -386,7 +383,7 @@ impl Communicator { let mut f = f.clone(); async move { let val = if state.is_powered_on() { - Some(tokio::time::timeout_at(timeout, f(id)).await) + Some(timeout.timeout_at(f(id)).await) } else { None }; @@ -400,7 +397,7 @@ impl Communicator { &self, sp: &SpSocket<'_>, mut kind: RequestKind, - timeout: Option, + timeout: Option, mut map_response_kind: F, ) -> Result where @@ -408,14 +405,14 @@ impl Communicator { { // helper to wrap a future in a timeout if we have one async fn maybe_with_timeout( - timeout: Option, + timeout: Option, fut: F, - ) -> Result + ) -> Result where F: Future, { match timeout { - Some(t) => tokio::time::timeout_at(t, fut).await, + Some(t) => t.timeout_at(fut).await, None => Ok(fut.await), } } @@ -435,7 +432,12 @@ impl Communicator { let duration = backoff .next_backoff() .expect("internal backoff policy gave up"); - maybe_with_timeout(timeout, tokio::time::sleep(duration)).await?; + maybe_with_timeout(timeout, tokio::time::sleep(duration)) + .await + .map_err(|err| Error::Timeout { + timeout: err.duration(), + sp: self.port_to_id(sp.port()), + })?; // request IDs will eventually roll over; since we enforce timeouts // this should be a non-issue in practice. does this need testing? @@ -461,7 +463,11 @@ impl Communicator { Ok::(response_fut.await?) }) - .await?; + .await + .map_err(|err| Error::Timeout { + timeout: err.duration(), + sp: self.port_to_id(sp.port()), + })?; match result { Ok(response_kind) => { diff --git a/gateway-sp-comms/src/error.rs b/gateway-sp-comms/src/error.rs index 1ff08ca190d..03f70ea4340 100644 --- a/gateway-sp-comms/src/error.rs +++ b/gateway-sp-comms/src/error.rs @@ -8,6 +8,7 @@ use crate::SpIdentifier; use gateway_messages::ResponseError; use std::io; use std::net::SocketAddr; +use std::time::Duration; use thiserror::Error; #[derive(Debug, Error)] @@ -26,8 +27,8 @@ pub enum Error { .0.slot, )] SpAddressUnknown(SpIdentifier), - #[error("timeout elapsed")] - Timeout, + #[error("timeout ({timeout:?}) elapsed communicating with {sp:?}")] + Timeout { timeout: Duration, sp: SpIdentifier }, #[error("error communicating with SP: {0}")] SpCommunicationFailed(#[from] SpCommunicationError), #[error("serial console is already attached")] @@ -54,9 +55,3 @@ pub struct BadResponseType { pub expected: &'static str, pub got: &'static str, } - -impl From for Error { - fn from(_: tokio::time::error::Elapsed) -> Self { - Self::Timeout - } -} diff --git a/gateway-sp-comms/src/lib.rs b/gateway-sp-comms/src/lib.rs index e1142ff4b6c..f102564dbb1 100644 --- a/gateway-sp-comms/src/lib.rs +++ b/gateway-sp-comms/src/lib.rs @@ -16,6 +16,7 @@ mod communicator; mod management_switch; mod recv_handler; +mod timeout; pub use usdt::register_probes; @@ -25,6 +26,8 @@ pub use communicator::Communicator; pub use communicator::FuturesUnorderedImpl; pub use management_switch::SpIdentifier; pub use management_switch::SpType; +pub use timeout::Elapsed; +pub use timeout::Timeout; // TODO these will remain public for a while, but eventually will be removed // altogther; currently these provide a way to hard-code the rack topology, diff --git a/gateway-sp-comms/src/recv_handler/mod.rs b/gateway-sp-comms/src/recv_handler/mod.rs index 3abe091f8bc..2b105cdba47 100644 --- a/gateway-sp-comms/src/recv_handler/mod.rs +++ b/gateway-sp-comms/src/recv_handler/mod.rs @@ -9,6 +9,7 @@ use crate::management_switch::ManagementSwitch; use crate::management_switch::ManagementSwitchDiscovery; use crate::management_switch::SwitchPort; use crate::Communicator; +use crate::Timeout; use futures::future::Fuse; use futures::FutureExt; use futures::SinkExt; @@ -414,7 +415,7 @@ impl SerialConsoleTask { .serial_console_send_packet( self.port, packet, - tokio::time::Instant::now() + self.sp_ack_timeout, + Timeout::from_now(self.sp_ack_timeout), ) .map_ok(move |()| packet_data_len) .fuse(); diff --git a/gateway-sp-comms/src/timeout.rs b/gateway-sp-comms/src/timeout.rs new file mode 100644 index 00000000000..2499d3e59f4 --- /dev/null +++ b/gateway-sp-comms/src/timeout.rs @@ -0,0 +1,58 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2022 Oxide Computer Company + +use futures::Future; +use futures::TryFutureExt; +use std::time::Duration; +use tokio::time::Instant; + +/// Error type returned from [`Timeout::timeout_at()`]. +#[derive(Debug, Clone, Copy)] +pub struct Elapsed(pub Timeout); + +impl Elapsed { + /// Get the duration of the timeout that elapsed. + pub fn duration(&self) -> Duration { + self.0.duration() + } +} + +/// Representation of a timeout as both its starting time and its duration. +#[derive(Debug, Clone, Copy)] +pub struct Timeout { + start: Instant, + duration: Duration, +} + +impl Timeout { + /// Create a new `Timeout` with the given duration starting from + /// [`Instant::now()`]. + pub fn from_now(duration: Duration) -> Self { + Self { start: Instant::now(), duration } + } + + /// Get the [`Instant`] when this timeout expires. + pub fn end(&self) -> Instant { + self.start + self.duration + } + + /// Get the duration of this timeout. + pub fn duration(&self) -> Duration { + self.duration + } + + /// Wrap a future with this timeout. + pub fn timeout_at( + self, + future: T, + ) -> impl Future> + where + T: Future, + { + tokio::time::timeout_at(self.end(), future) + .map_err(move |_| Elapsed(self)) + } +} diff --git a/gateway/src/bulk_state_get.rs b/gateway/src/bulk_state_get.rs index f79bda357c2..8f85aff94ab 100644 --- a/gateway/src/bulk_state_get.rs +++ b/gateway/src/bulk_state_get.rs @@ -61,8 +61,10 @@ use crate::error::InvalidPageToken; use futures::StreamExt; use gateway_messages::IgnitionState; use gateway_sp_comms::Communicator; +use gateway_sp_comms::Elapsed; use gateway_sp_comms::FuturesUnorderedImpl; use gateway_sp_comms::SpIdentifier; +use gateway_sp_comms::Timeout; use serde::Deserialize; use serde::Serialize; use slog::debug; @@ -75,7 +77,6 @@ use std::sync::Mutex; use std::sync::RwLock; use std::time::Duration; use tokio::sync::Notify; -use tokio::time::Instant; use uuid::Uuid; use crate::http_entrypoints::SpState; @@ -144,7 +145,7 @@ impl BulkSpStateRequests { pub(crate) async fn start( &self, - timeout: Instant, + timeout: Timeout, retain_grace_period: Duration, ) -> Result { // set up the receiving end of all SP responses @@ -189,7 +190,7 @@ impl BulkSpStateRequests { &self, id: &SpStateRequestId, last_seen: Option, - timeout: Instant, + timeout: Timeout, limit: usize, ) -> Result { let log = self.log.new(slog::o!( @@ -201,7 +202,7 @@ impl BulkSpStateRequests { // Go ahead and create (and pin) the timeout, but we don't actually // await it until the loop at the bottom of this function. - let timeout = tokio::time::sleep_until(timeout); + let timeout = tokio::time::sleep_until(timeout.end()); tokio::pin!(timeout); let collector = @@ -381,7 +382,7 @@ async fn wait_for_sp_responses( Item = ( SpIdentifier, IgnitionState, - Option>, + Option>, ), >, { diff --git a/gateway/src/error.rs b/gateway/src/error.rs index 93259aef785..3a8cf4649c6 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -56,7 +56,7 @@ where err.to_string(), ), SpCommsError::SpAddressUnknown(_) - | SpCommsError::Timeout + | SpCommsError::Timeout { .. } | SpCommsError::SpCommunicationFailed(_) => { HttpError::for_internal_error(err.to_string()) } diff --git a/gateway/src/http_entrypoints.rs b/gateway/src/http_entrypoints.rs index 3246f3e3f95..dc3171afc2d 100644 --- a/gateway/src/http_entrypoints.rs +++ b/gateway/src/http_entrypoints.rs @@ -28,11 +28,11 @@ use dropshot::TypedBody; use dropshot::WhichPage; use gateway_messages::IgnitionCommand; use gateway_sp_comms::error::Error as SpCommsError; +use gateway_sp_comms::Timeout as SpTimeout; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Duration; -use tokio::time::Instant; #[derive( Debug, @@ -268,7 +268,7 @@ async fn sp_list( .unwrap_or(apictx.timeouts.bulk_request_default) // TODO do we also want a floor for the timeout? .min(apictx.timeouts.bulk_request_max); - let timeout = Instant::now() + timeout; + let timeout = SpTimeout::from_now(timeout); let request_id = apictx .bulk_sp_state_requests @@ -290,7 +290,7 @@ async fn sp_list( .get( &request_id, last_seen_target.map(Into::into), - Instant::now() + apictx.timeouts.bulk_request_page, + SpTimeout::from_now(apictx.timeouts.bulk_request_page), page_limit, ) .await?; @@ -316,7 +316,7 @@ async fn sp_list( // TODO Treating "communication failed" and "we don't know // the IP address" as "unresponsive" may not be right. Do we // need more refined errors? - SpCommsError::Timeout + SpCommsError::Timeout { .. } | SpCommsError::SpCommunicationFailed(_) | SpCommsError::SpAddressUnknown(_) => { SpState::Unresponsive @@ -369,12 +369,13 @@ async fn sp_get( // putting it here, the time it takes us to query ignition counts against // the client's timeout; that seems right but puts us in a bind if their // timeout expires while we're still waiting for ignition. - let timeout = Instant::now() - + query + let timeout = SpTimeout::from_now( + query .into_inner() .timeout_millis .map(|n| Duration::from_millis(u64::from(n))) - .unwrap_or(apictx.timeouts.sp_request); + .unwrap_or(apictx.timeouts.sp_request), + ); // ping the ignition controller first; if it says the SP is off or otherwise // unavailable, we're done. @@ -387,7 +388,7 @@ async fn sp_get( // ignition indicates the SP is on; ask it for its state match comms.get_state(sp.into(), timeout).await { Ok(state) => SpState::from(state), - Err(SpCommsError::Timeout) => SpState::Unresponsive, + Err(SpCommsError::Timeout { .. }) => SpState::Unresponsive, Err(other) => return Err(http_err_from_comms_err(other)), } } else { @@ -569,9 +570,9 @@ async fn ignition_list( let sp_comms = &apictx.sp_comms; let all_state = sp_comms - .get_ignition_state_all( - Instant::now() + apictx.timeouts.ignition_controller, - ) + .get_ignition_state_all(SpTimeout::from_now( + apictx.timeouts.ignition_controller, + )) .await .map_err(http_err_from_comms_err)?; @@ -602,7 +603,7 @@ async fn ignition_get( .sp_comms .get_ignition_state( sp.into(), - Instant::now() + apictx.timeouts.ignition_controller, + SpTimeout::from_now(apictx.timeouts.ignition_controller), ) .await .map_err(http_err_from_comms_err)?; @@ -628,7 +629,7 @@ async fn ignition_power_on( .send_ignition_command( sp.into(), IgnitionCommand::PowerOn, - Instant::now() + apictx.timeouts.ignition_controller, + SpTimeout::from_now(apictx.timeouts.ignition_controller), ) .await .map_err(http_err_from_comms_err)?; @@ -653,7 +654,7 @@ async fn ignition_power_off( .send_ignition_command( sp.into(), IgnitionCommand::PowerOff, - Instant::now() + apictx.timeouts.ignition_controller, + SpTimeout::from_now(apictx.timeouts.ignition_controller), ) .await .map_err(http_err_from_comms_err)?; diff --git a/gateway/tests/config.test.toml b/gateway/tests/config.test.toml index cb7fb21c86d..3725c4ecd3a 100644 --- a/gateway/tests/config.test.toml +++ b/gateway/tests/config.test.toml @@ -13,12 +13,12 @@ sleds = [] power_controllers = [] [timeouts] -ignition_controller_millis = 500 -sp_request_millis = 500 -bulk_request_default_millis = 1_000 -bulk_request_max_millis = 2_000 -bulk_request_page_millis = 500 -bulk_request_retain_grace_period_millis = 2_000 +ignition_controller_millis = 10_000 +sp_request_millis = 10_000 +bulk_request_default_millis = 10_000 +bulk_request_max_millis = 40_000 +bulk_request_page_millis = 2_000 +bulk_request_retain_grace_period_millis = 10_000 # # NOTE: for the test suite, the port MUST be 0 (in order to bind to any