diff --git a/src/init/init.rs b/src/init/init.rs index 1347fe75..a46c37df 100644 --- a/src/init/init.rs +++ b/src/init/init.rs @@ -70,7 +70,7 @@ async fn main() { ); const START_PORT: u32 = 3; - const INITIAL_POOL_SIZE: u32 = 1; // start at pool size 1, grow based on manifest/args as necessary (see Reaper) + const INITIAL_POOL_SIZE: u8 = 1; // start at pool size 1, grow based on manifest/args as necessary (see Reaper) let core_pool = StreamPool::new( SocketAddress::new_vsock(cid, START_PORT, VMADDR_NO_FLAGS), INITIAL_POOL_SIZE, diff --git a/src/integration/src/bin/pivot_pool_size.rs b/src/integration/src/bin/pivot_pool_size.rs new file mode 100644 index 00000000..ef0a4cb3 --- /dev/null +++ b/src/integration/src/bin/pivot_pool_size.rs @@ -0,0 +1,9 @@ +use integration::PIVOT_POOL_SIZE_SUCCESS_FILE; + +fn main() { + if std::env::var("POOL_SIZE").is_err() { + panic!("invalid pool size specified") + } + + integration::Cli::execute(PIVOT_POOL_SIZE_SUCCESS_FILE); +} diff --git a/src/integration/src/lib.rs b/src/integration/src/lib.rs index 8f21600a..a9964fbe 100644 --- a/src/integration/src/lib.rs +++ b/src/integration/src/lib.rs @@ -9,7 +9,7 @@ use std::time::Duration; use borsh::{BorshDeserialize, BorshSerialize}; use qos_core::{ client::SocketClient, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, + io::{SocketAddress, StreamPool}, parser::{GetParserForOptions, OptionsParser, Parser, Token}, }; @@ -19,12 +19,16 @@ pub const PIVOT_OK_SUCCESS_FILE: &str = "./pivot_ok_works"; pub const PIVOT_OK2_SUCCESS_FILE: &str = "./pivot_ok2_works"; /// Path to the file `pivot_ok3` writes on success for tests. pub const PIVOT_OK3_SUCCESS_FILE: &str = "./pivot_ok3_works"; +/// Path to the file `pivot_pool_size` writes on success for tests. +pub const PIVOT_POOL_SIZE_SUCCESS_FILE: &str = "./pivot_pool_size_works"; /// Path to pivot_ok bin for tests. pub const PIVOT_OK_PATH: &str = "../target/debug/pivot_ok"; /// Path to pivot_ok2 bin for tests. pub const PIVOT_OK2_PATH: &str = "../target/debug/pivot_ok2"; /// Path to pivot_ok3 bin for tests. pub const PIVOT_OK3_PATH: &str = "../target/debug/pivot_ok3"; +/// Path to pivot_pool_size bin for tests. +pub const PIVOT_POOL_SIZE_PATH: &str = "../target/debug/pivot_pool_size"; /// Path to pivot loop bin for tests. pub const PIVOT_LOOP_PATH: &str = "../target/debug/pivot_loop"; /// Path to pivot_abort bin for tests. @@ -132,7 +136,7 @@ pub struct AdditionProofPayload { pub async fn wait_for_usock(path: &str) { let addr = SocketAddress::new_unix(path); let pool = StreamPool::new(addr, 1).unwrap().shared(); - let client = SocketClient::new(pool, TimeVal::milliseconds(50)); + let client = SocketClient::new(pool, Duration::from_millis(50)); for _ in 0..50 { if std::fs::exists(path).unwrap() && client.try_connect().await.is_ok() diff --git a/src/integration/tests/client.rs b/src/integration/tests/client.rs index 4103350f..b598a359 100644 --- a/src/integration/tests/client.rs +++ b/src/integration/tests/client.rs @@ -1,8 +1,8 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use qos_core::{ client::SocketClient, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, + io::{SocketAddress, StreamPool}, server::SocketServerError, server::{RequestProcessor, SocketServer}, }; @@ -37,7 +37,7 @@ async fn run_echo_server( async fn direct_connect_works() { let socket_path = "/tmp/async_client_test_direct_connect_works.sock"; let socket = SocketAddress::new_unix(socket_path); - let timeout = TimeVal::milliseconds(500); + let timeout = Duration::from_millis(500); let pool = StreamPool::new(socket, 1) .expect("unable to create async pool") .shared(); @@ -54,7 +54,7 @@ async fn direct_connect_works() { async fn times_out_properly() { let socket_path = "/tmp/async_client_test_times_out_properly.sock"; let socket = SocketAddress::new_unix(socket_path); - let timeout = TimeVal::milliseconds(500); + let timeout = Duration::from_millis(500); let pool = StreamPool::new(socket, 1) .expect("unable to create async pool") .shared(); @@ -68,7 +68,7 @@ async fn times_out_properly() { async fn repeat_connect_works() { let socket_path = "/tmp/async_client_test_repeat_connect_works.sock"; let socket = SocketAddress::new_unix(socket_path); - let timeout = TimeVal::milliseconds(500); + let timeout = Duration::from_millis(500); let pool = StreamPool::new(socket, 1) .expect("unable to create async pool") .shared(); diff --git a/src/integration/tests/enclave_app_client_socket_stress.rs b/src/integration/tests/enclave_app_client_socket_stress.rs index e5025610..b07506a1 100644 --- a/src/integration/tests/enclave_app_client_socket_stress.rs +++ b/src/integration/tests/enclave_app_client_socket_stress.rs @@ -7,14 +7,14 @@ use integration::{ use qos_core::{ client::SocketClient, handles::Handles, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, + io::{SocketAddress, StreamPool}, protocol::{ msg::ProtocolMsg, services::boot::{ Manifest, ManifestEnvelope, ManifestSet, Namespace, NitroConfig, PivotConfig, RestartPolicy, ShareSet, }, - ProtocolError, ProtocolPhase, ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + ProtocolError, ProtocolPhase, INITIAL_CLIENT_TIMEOUT, }, reaper::{Reaper, REAPER_RESTART_DELAY}, }; @@ -103,7 +103,7 @@ async fn enclave_app_client_socket_stress() { StreamPool::single(SocketAddress::new_unix(ENCLAVE_SOCK)).unwrap(); let enclave_client = SocketClient::new( enclave_client_pool.shared(), - TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS + 3), // needs to be bigger than the slow request below + some time for recovery + INITIAL_CLIENT_TIMEOUT + Duration::from_secs(3), // needs to be bigger than the slow request below + some time for recovery ); let app_request = diff --git a/src/integration/tests/interleaving_socket_stress.rs b/src/integration/tests/interleaving_socket_stress.rs index 5b4808f6..7476091c 100644 --- a/src/integration/tests/interleaving_socket_stress.rs +++ b/src/integration/tests/interleaving_socket_stress.rs @@ -6,8 +6,8 @@ use integration::{ }; use qos_core::{ client::{ClientError, SocketClient}, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, - protocol::ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + io::{SocketAddress, StreamPool}, + protocol::INITIAL_CLIENT_TIMEOUT, }; use qos_test_primitives::ChildWrapper; @@ -27,12 +27,12 @@ async fn interleaving_socket_stress() { wait_for_usock(SOCKET_STRESS_SOCK).await; // needs to be long enough for process exit to register and not cause a timeout - let timeout = TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS); let app_pool = StreamPool::new(SocketAddress::new_unix(SOCKET_STRESS_SOCK), pool_size) .unwrap(); - let enclave_client = SocketClient::new(app_pool.shared(), timeout); + let enclave_client = + SocketClient::new(app_pool.shared(), INITIAL_CLIENT_TIMEOUT); let mut tasks = Vec::new(); // wait long enough for app to be running and listening diff --git a/src/integration/tests/proofs.rs b/src/integration/tests/proofs.rs index af08a317..0e626c79 100644 --- a/src/integration/tests/proofs.rs +++ b/src/integration/tests/proofs.rs @@ -4,8 +4,8 @@ use borsh::BorshDeserialize; use integration::{wait_for_usock, PivotProofMsg, PIVOT_PROOF_PATH}; use qos_core::{ client::SocketClient, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, - protocol::ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + io::{SocketAddress, StreamPool}, + protocol::INITIAL_CLIENT_TIMEOUT, }; use qos_p256::P256Public; @@ -27,10 +27,8 @@ async fn fetch_and_verify_app_proof() { StreamPool::single(SocketAddress::new_unix(PROOF_TEST_ENCLAVE_SOCKET)) .unwrap(); - let enclave_client = SocketClient::new( - enclave_pool.shared(), - TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS), - ); + let enclave_client = + SocketClient::new(enclave_pool.shared(), INITIAL_CLIENT_TIMEOUT); let app_request = borsh::to_vec(&PivotProofMsg::AdditionRequest { a: 2, b: 2 }).unwrap(); diff --git a/src/integration/tests/reaper.rs b/src/integration/tests/reaper.rs index a0185ec5..6b2ee69a 100644 --- a/src/integration/tests/reaper.rs +++ b/src/integration/tests/reaper.rs @@ -1,10 +1,17 @@ -use std::fs; +use std::{fs, time::Duration}; -use integration::{PIVOT_ABORT_PATH, PIVOT_OK_PATH, PIVOT_PANIC_PATH}; +use integration::{ + wait_for_usock, PivotSocketStressMsg, PIVOT_ABORT_PATH, PIVOT_OK_PATH, + PIVOT_PANIC_PATH, PIVOT_POOL_SIZE_PATH, PIVOT_SOCKET_STRESS_PATH, +}; use qos_core::{ + client::SocketClient, handles::Handles, io::{SocketAddress, StreamPool}, - protocol::services::boot::ManifestEnvelope, + protocol::{ + msg::ProtocolMsg, services::boot::ManifestEnvelope, ProtocolError, + ProtocolPhase, + }, reaper::{Reaper, REAPER_EXIT_DELAY}, }; use qos_nsm::mock::MockNsm; @@ -13,7 +20,6 @@ use qos_test_primitives::PathWrapper; #[tokio::test] async fn reaper_works() { let secret_path: PathWrapper = "/tmp/reaper_works.secret".into(); - // let eph_path = "reaper_works.eph.key"; let usock: PathWrapper = "/tmp/reaper_works.sock".into(); let manifest_path: PathWrapper = "/tmp/reaper_works.manifest".into(); let msg = "durp-a-durp"; @@ -38,10 +44,10 @@ async fn reaper_works() { assert!(handles.pivot_exists()); let enclave_pool = - StreamPool::new(SocketAddress::new_unix(&usock), 1).unwrap(); + StreamPool::single(SocketAddress::new_unix(&usock)).unwrap(); let app_pool = - StreamPool::new(SocketAddress::new_unix("./never.sock"), 1).unwrap(); + StreamPool::single(SocketAddress::new_unix("./never.sock")).unwrap(); let reaper_handle = tokio::spawn(async move { Reaper::execute( @@ -55,7 +61,7 @@ async fn reaper_works() { }); // Give the enclave server time to bind to the socket - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + wait_for_usock(&usock).await; // Check that the reaper is still running, presumably waiting for // the secret. @@ -72,6 +78,94 @@ async fn reaper_works() { assert!(fs::remove_file(integration::PIVOT_OK_SUCCESS_FILE).is_ok()); } +#[tokio::test] +async fn reaper_timeout_works() { + let secret_path: PathWrapper = "/tmp/reaper_timeout_works.secret".into(); + let enclave_sock: PathWrapper = "/tmp/reaper_timeout_works.sock".into(); + let app_sock: PathWrapper = "/tmp/reaper_timeout_works_app.sock".into(); + let manifest_path: PathWrapper = + "/tmp/reaper_timeout_works.manifest".into(); + + // clean up old manifest if it's left from a panic + drop(std::fs::remove_file(&*manifest_path)); + + // For our sanity, ensure the secret does not yet exist + drop(fs::remove_file(&*secret_path)); + + let handles = Handles::new( + "eph_path".to_string(), + (*secret_path).to_string(), + (*manifest_path).to_string(), + PIVOT_SOCKET_STRESS_PATH.to_string(), + ); + + // Make sure we have written everything necessary to pivot, except the + // quorum key + let mut manifest_envelope = ManifestEnvelope::default(); + // Tell pivot where to open up the server app socket + manifest_envelope.manifest.pivot.args = vec![app_sock.to_string()]; + + // we'll be checking if this is set by passing slow and fast requests + manifest_envelope.manifest.client_timeout_ms = Some(2000); + + handles.put_manifest_envelope(&manifest_envelope).unwrap(); + assert!(handles.pivot_exists()); + + let enclave_pool = + StreamPool::single(SocketAddress::new_unix(&enclave_sock)).unwrap(); + + let app_pool = + StreamPool::single(SocketAddress::new_unix(&app_sock)).unwrap(); + + let reaper_handle = tokio::spawn(async move { + Reaper::execute( + &handles, + Box::new(MockNsm), + enclave_pool, + app_pool, + Some(ProtocolPhase::QuorumKeyProvisioned), + ) + .await; + }); + + // Give the enclave server time to bind to the socket + wait_for_usock(&enclave_sock).await; + + // Check that the reaper is still running, presumably waiting for + // the secret. + assert!(!reaper_handle.is_finished()); + + // Create the file with the secret, which should cause the reaper + // to start executable. + fs::write(&*secret_path, b"super dank tank secret tech").unwrap(); + + // Give the app server time to bind to the socket + wait_for_usock(&app_sock).await; + + // create a "slow" app request longer than client timeout from `Manifest`, but longer than 5s timeout on our local client. + let app_request = + borsh::to_vec(&PivotSocketStressMsg::SlowRequest(3000)).unwrap(); + let request = + borsh::to_vec(&ProtocolMsg::ProxyRequest { data: app_request }) + .unwrap(); + + // ensure our client to the enclave has longer timeout than the configured 2s and the slow request 3s + let client = SocketClient::single( + SocketAddress::new_unix(&enclave_sock), + Duration::from_millis(5000), + ) + .unwrap(); + + let response: ProtocolMsg = + borsh::from_slice(&client.call(&request).await.unwrap()).unwrap(); + + // The response should be AppClientRecvTimeout which indicates the enclave short-circuited the timeout + assert_eq!( + response, + ProtocolMsg::ProtocolErrorResponse(ProtocolError::AppClientRecvTimeout) + ); +} + #[tokio::test] async fn reaper_handles_non_zero_exits() { let secret_path: PathWrapper = @@ -113,7 +207,7 @@ async fn reaper_handles_non_zero_exits() { }); // Give the enclave server time to bind to the socket - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + wait_for_usock(&usock).await; // Check that the reaper is still running, presumably waiting for // the secret. @@ -170,7 +264,7 @@ async fn reaper_handles_panic() { }); // Give the enclave server time to bind to the socket - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + wait_for_usock(&usock).await; // Check that the reaper is still running, presumably waiting for // the secret. @@ -187,6 +281,71 @@ async fn reaper_handles_panic() { assert!(reaper_handle.is_finished()); } +#[tokio::test] +async fn reaper_handles_pool_size() { + let secret_path: PathWrapper = + "/tmp/reaper_handles_pool_size.secret".into(); + let usock: PathWrapper = "/tmp/reaper_handles_pool_size.sock".into(); + let manifest_path: PathWrapper = + "/tmp/reaper_handles_pool_size.manifest".into(); + let msg = "5"; // must match pool-size in manifest below (test thing) + + // For our sanity, ensure the secret does not yet exist + drop(fs::remove_file(&*secret_path)); + + let handles = Handles::new( + "eph_path".to_string(), + (*secret_path).to_string(), + (*manifest_path).to_string(), + PIVOT_POOL_SIZE_PATH.to_string(), + ); + + // Make sure we have written everything necessary to pivot, except the + // quorum key + let mut manifest_envelope = ManifestEnvelope::default(); + manifest_envelope.manifest.pivot.args = + vec!["--msg".to_string(), msg.to_string()]; + // set a pool size > 1 + manifest_envelope.manifest.pool_size = Some(5); + + handles.put_manifest_envelope(&manifest_envelope).unwrap(); + assert!(handles.pivot_exists()); + + let enclave_pool = + StreamPool::single(SocketAddress::new_unix(&usock)).unwrap(); + + let app_pool = + StreamPool::single(SocketAddress::new_unix("/tmp/never.sock")).unwrap(); + + let reaper_handle = tokio::spawn(async move { + Reaper::execute( + &handles, + Box::new(MockNsm), + enclave_pool, + app_pool, + None, + ) + .await; + }); + + // wait for enclave to listen + wait_for_usock(&usock).await; + + // Check that the reaper is still running, presumably waiting for + // the secret. + assert!(!reaper_handle.is_finished()); + + // Create the file with the secret, which should cause the reaper + // to start executable. + fs::write(&*secret_path, b"super dank tank secret tech").unwrap(); + + // Make the sure the reaper executed successfully. + reaper_handle.await.unwrap(); + let contents = fs::read(integration::PIVOT_POOL_SIZE_SUCCESS_FILE).unwrap(); + assert_eq!(std::str::from_utf8(&contents).unwrap(), msg); + assert!(fs::remove_file(integration::PIVOT_POOL_SIZE_SUCCESS_FILE).is_ok()); +} + #[test] fn can_restart_panicking_pivot() { // Create a manifest with restart option diff --git a/src/integration/tests/remote_tls.rs b/src/integration/tests/remote_tls.rs index a89f6f3e..43be13b4 100644 --- a/src/integration/tests/remote_tls.rs +++ b/src/integration/tests/remote_tls.rs @@ -6,8 +6,8 @@ use integration::{ }; use qos_core::{ client::SocketClient, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, - protocol::ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + io::{SocketAddress, StreamPool}, + protocol::INITIAL_CLIENT_TIMEOUT, }; use qos_test_primitives::ChildWrapper; @@ -43,10 +43,8 @@ async fn fetch_async_remote_tls_content() { ) .expect("unable to create enclave async pool"); - let enclave_client = SocketClient::new( - enclave_pool.shared(), - TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS), - ); + let enclave_client = + SocketClient::new(enclave_pool.shared(), INITIAL_CLIENT_TIMEOUT); let app_request = borsh::to_vec(&PivotRemoteTlsMsg::RemoteTlsRequest { host: "api.turnkey.com".to_string(), diff --git a/src/integration/tests/simple_socket_stress.rs b/src/integration/tests/simple_socket_stress.rs index 42158a8b..ffaf48e3 100644 --- a/src/integration/tests/simple_socket_stress.rs +++ b/src/integration/tests/simple_socket_stress.rs @@ -5,8 +5,8 @@ use integration::{ }; use qos_core::{ client::{ClientError, SocketClient}, - io::{IOError, SocketAddress, StreamPool, TimeVal, TimeValLike}, - protocol::ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + io::{IOError, SocketAddress, StreamPool}, + protocol::INITIAL_CLIENT_TIMEOUT, }; use qos_test_primitives::ChildWrapper; @@ -22,14 +22,12 @@ async fn simple_socket_stress() { wait_for_usock(SOCKET_STRESS_SOCK).await; - // needs to be long enough for process exit to register and not cause a timeout - let timeout = TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS); - let app_pool = StreamPool::new(SocketAddress::new_unix(SOCKET_STRESS_SOCK), 1) .unwrap(); - let enclave_client = SocketClient::new(app_pool.shared(), timeout); + let enclave_client = + SocketClient::new(app_pool.shared(), INITIAL_CLIENT_TIMEOUT); let app_request = borsh::to_vec(&PivotSocketStressMsg::SlowRequest(5500)).unwrap(); diff --git a/src/qos_client/src/cli/mod.rs b/src/qos_client/src/cli/mod.rs index 0f837291..639d41b4 100644 --- a/src/qos_client/src/cli/mod.rs +++ b/src/qos_client/src/cli/mod.rs @@ -40,6 +40,8 @@ const PATCH_SET_DIR: &str = "patch-set-dir"; const NAMESPACE_DIR: &str = "namespace-dir"; const UNSAFE_AUTO_CONFIRM: &str = "unsafe-auto-confirm"; const PUB_PATH: &str = "pub-path"; +const POOL_SIZE: &str = "pool-size"; +const CLIENT_TIMEOUT: &str = "client-timeout"; const YUBIKEY: &str = "yubikey"; const SECRET_PATH: &str = "secret-path"; const SHARE_PATH: &str = "share-path"; @@ -568,6 +570,21 @@ impl Command { .takes_value(false) } + fn pool_size() -> Token { + Token::new(POOL_SIZE, "Socket pool size for USOCK/VSOCK") + .required(false) + .takes_value(true) + } + + fn client_timeout() -> Token { + Token::new( + CLIENT_TIMEOUT, + "Client timeout for enclave <-> app communication", + ) + .required(false) + .takes_value(true) + } + fn base() -> Parser { Parser::new() .token( @@ -652,6 +669,8 @@ impl Command { .token(Self::patch_set_dir_token()) .token(Self::quorum_key_path_token()) .token(Self::pivot_args_token()) + .token(Self::pool_size()) + .token(Self::client_timeout()) } fn approve_manifest() -> Parser { @@ -992,6 +1011,19 @@ impl ClientOpts { } } + fn pool_size(&self) -> Option { + self.parsed.single(POOL_SIZE).map(|s| { + s.parse().expect("pool-size not valid integer in range <1..255>") + }) + } + + fn client_timeout_ms(&self) -> Option { + self.parsed.single(CLIENT_TIMEOUT).map(|s| { + s.parse() + .expect("client timeout invalid integer in range <0..65535>") + }) + } + fn pub_path(&self) -> String { self.parsed.single(PUB_PATH).expect("Missing `--pub-path`").to_string() } @@ -1517,6 +1549,8 @@ mod handlers { manifest_set_dir: opts.manifest_set_dir(), patch_set_dir: opts.patch_set_dir(), quorum_key_path: opts.quorum_key_path(), + pool_size: opts.pool_size(), + client_timeout_ms: opts.client_timeout_ms(), }) { println!("Error: {e:?}"); std::process::exit(1); diff --git a/src/qos_client/src/cli/services.rs b/src/qos_client/src/cli/services.rs index f5dcd10c..c5a0740b 100644 --- a/src/qos_client/src/cli/services.rs +++ b/src/qos_client/src/cli/services.rs @@ -701,6 +701,8 @@ pub(crate) struct GenerateManifestArgs> { pub quorum_key_path: P, pub manifest_path: P, pub pivot_args: Vec, + pub pool_size: Option, + pub client_timeout_ms: Option, } pub(crate) fn generate_manifest>( @@ -719,6 +721,8 @@ pub(crate) fn generate_manifest>( quorum_key_path, manifest_path, pivot_args, + pool_size, + client_timeout_ms, } = args; let nitro_config = @@ -749,6 +753,8 @@ pub(crate) fn generate_manifest>( share_set, patch_set, enclave: nitro_config, + pool_size, + client_timeout_ms, }; write_with_msg( @@ -1642,6 +1648,8 @@ pub(crate) fn dangerous_dev_boot>( members: vec![member.clone()], }, patch_set: PatchSet { threshold: 0, members: vec![] }, + pool_size: None, + client_timeout_ms: None, }; // Create and post the boot standard instruction @@ -2256,6 +2264,8 @@ mod tests { share_set: share_set.clone(), patch_set: patch_set.clone(), enclave: nitro_config.clone(), + pool_size: None, + client_timeout_ms: None, }; let manifest_envelope = ManifestEnvelope { diff --git a/src/qos_core/src/client.rs b/src/qos_core/src/client.rs index b991db4c..66ed4046 100644 --- a/src/qos_core/src/client.rs +++ b/src/qos_core/src/client.rs @@ -3,8 +3,6 @@ use std::time::Duration; -use nix::sys::time::TimeVal; - use crate::io::{IOError, SharedStreamPool, SocketAddress, StreamPool}; /// Enclave client error. @@ -37,18 +35,16 @@ pub struct SocketClient { impl SocketClient { /// Create a new client with given `StreamPool`. #[must_use] - pub fn new(pool: SharedStreamPool, timeout: TimeVal) -> Self { - let timeout = timeval_to_duration(timeout); + pub fn new(pool: SharedStreamPool, timeout: Duration) -> Self { Self { pool, timeout } } /// Create a new client from a single `SocketAddress`. This creates an implicit single socket `StreamPool`. pub fn single( addr: SocketAddress, - timeout: TimeVal, + timeout: Duration, ) -> Result { let pool = StreamPool::new(addr, 1)?.shared(); - let timeout = timeval_to_duration(timeout); Ok(Self { pool, timeout }) } @@ -73,10 +69,15 @@ impl SocketClient { Ok(resp) } + /// Sets the client's timeout value + pub fn set_timeout(&mut self, timeout: Duration) { + self.timeout = timeout; + } + /// Expands the underlying `AsyncPool` to given `pool_size` pub async fn expand_to( &mut self, - pool_size: u32, + pool_size: u8, ) -> Result<(), ClientError> { self.pool.write().await.expand_to(pool_size)?; @@ -91,14 +92,3 @@ impl SocketClient { stream.connect().await } } - -// Convers TimeVal to Duration -// # Panics -// -// Panics if timeval values are negative -fn timeval_to_duration(timeval: TimeVal) -> Duration { - let secs: u64 = timeval.tv_sec().try_into().expect("invalid TimeVal value"); - let usecs: u32 = - timeval.tv_usec().try_into().expect("invalid TimeVal value"); - Duration::new(secs, usecs * 1000) -} diff --git a/src/qos_core/src/handles.rs b/src/qos_core/src/handles.rs index ad8f2910..74e7a2e5 100644 --- a/src/qos_core/src/handles.rs +++ b/src/qos_core/src/handles.rs @@ -436,6 +436,8 @@ mod test { manifest_set: ManifestSet { threshold: 2, members: vec![] }, share_set: ShareSet { threshold: 2, members: vec![] }, patch_set: PatchSet::default(), + pool_size: None, + client_timeout_ms: None, }; let manifest_envelope = ManifestEnvelope { diff --git a/src/qos_core/src/io/pool.rs b/src/qos_core/src/io/pool.rs index ad9a44d5..1ba94383 100644 --- a/src/qos_core/src/io/pool.rs +++ b/src/qos_core/src/io/pool.rs @@ -65,7 +65,7 @@ impl StreamPool { /// Create a new `StreamPool` with given starting `SocketAddress`, timeout and number of addresses to populate. pub fn new( start_address: SocketAddress, - mut count: u32, + mut count: u8, ) -> Result { eprintln!("StreamPool start address: {start_address}"); @@ -157,7 +157,7 @@ impl StreamPool { } /// Expands the pool with new addresses using `SocketAddress::next_address` - pub fn expand_to(&mut self, size: u32) -> Result<(), IOError> { + pub fn expand_to(&mut self, size: u8) -> Result<(), IOError> { eprintln!("StreamPool: expanding async pool to {size}"); let size = size as usize; @@ -176,7 +176,7 @@ impl StreamPool { } /// Listen to new connections on added sockets on top of existing listeners, returning the list of new `Listener` - pub fn listen_to(&mut self, size: u32) -> Result, IOError> { + pub fn listen_to(&mut self, size: u8) -> Result, IOError> { eprintln!("StreamPool: listening async pool to {size}"); let size = size as usize; let mut listeners = Vec::new(); diff --git a/src/qos_core/src/io/stream.rs b/src/qos_core/src/io/stream.rs index 6c469c2b..6bf73eba 100644 --- a/src/qos_core/src/io/stream.rs +++ b/src/qos_core/src/io/stream.rs @@ -351,13 +351,9 @@ async fn vsock_connect( #[cfg(test)] mod test { - use std::{str::from_utf8, time::Duration}; - - use nix::sys::time::{TimeVal, TimeValLike}; - - use crate::{client::SocketClient, io::StreamPool}; - use super::*; + use crate::{client::SocketClient, io::StreamPool}; + use std::{str::from_utf8, time::Duration}; /// Wait for a given usock file to exist and be connectible with a timeout of 5s. /// @@ -366,7 +362,7 @@ mod test { pub async fn wait_for_usock(path: &str) { let addr = SocketAddress::new_unix(path); let pool = StreamPool::new(addr, 1).unwrap().shared(); - let client = SocketClient::new(pool, TimeVal::milliseconds(50)); + let client = SocketClient::new(pool, Duration::from_millis(50)); for _ in 0..50 { if std::fs::exists(path).unwrap() diff --git a/src/qos_core/src/protocol/mod.rs b/src/qos_core/src/protocol/mod.rs index 1e570f48..b3644282 100644 --- a/src/qos_core/src/protocol/mod.rs +++ b/src/qos_core/src/protocol/mod.rs @@ -9,10 +9,11 @@ pub mod services; mod state; pub use error::ProtocolError; +pub use state::ProtocolPhase; pub(crate) use state::ProtocolState; -pub use state::{ProtocolPhase, ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS}; pub(crate) mod processor; +pub use processor::INITIAL_CLIENT_TIMEOUT; /// 256bit hash pub type Hash256 = [u8; 32]; diff --git a/src/qos_core/src/protocol/processor.rs b/src/qos_core/src/protocol/processor.rs index 87d869a6..407f7502 100644 --- a/src/qos_core/src/protocol/processor.rs +++ b/src/qos_core/src/protocol/processor.rs @@ -1,13 +1,11 @@ //! Quorum protocol processor -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use crate::io::{TimeVal, TimeValLike}; use borsh::BorshDeserialize; use tokio::sync::RwLock; use super::{ - error::ProtocolError, msg::ProtocolMsg, state::ProtocolState, - ProtocolPhase, ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS, + error::ProtocolError, msg::ProtocolMsg, state::ProtocolState, ProtocolPhase, }; use crate::{ client::{ClientError, SocketClient}, @@ -18,6 +16,9 @@ use crate::{ const MEGABYTE: usize = 1024 * 1024; const MAX_ENCODED_MSG_LEN: usize = 128 * MEGABYTE; +/// Initial client timeout for the processor until the Manifest says otherwise, see reaper.rs +pub const INITIAL_CLIENT_TIMEOUT: Duration = Duration::from_secs(5); + /// Helper type to keep `ProtocolState` shared using `Arc>` type SharedProtocolState = Arc>; @@ -42,10 +43,7 @@ impl ProtocolProcessor { state: SharedProtocolState, app_pool: SharedStreamPool, ) -> Arc> { - let app_client = SocketClient::new( - app_pool, - TimeVal::seconds(ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS), - ); + let app_client = SocketClient::new(app_pool, INITIAL_CLIENT_TIMEOUT); Arc::new(RwLock::new(Self { app_client, state })) } @@ -54,10 +52,16 @@ impl ProtocolProcessor { self.state.read().await.get_phase() } + /// Sets the client timeout value for the `app_client`, maximum allowed value is `u16::MAX` milliseconds + pub fn set_client_timeout(&mut self, timeout: Duration) { + assert!(timeout.as_millis() < u16::MAX.into(), "client timeout > 65s"); + self.app_client.set_timeout(timeout); + } + /// Expands the app pool to given pool size pub async fn expand_to( &mut self, - pool_size: u32, + pool_size: u8, ) -> Result<(), ClientError> { self.app_client.expand_to(pool_size).await } diff --git a/src/qos_core/src/protocol/services/boot.rs b/src/qos_core/src/protocol/services/boot.rs index 91646c65..c774accf 100644 --- a/src/qos_core/src/protocol/services/boot.rs +++ b/src/qos_core/src/protocol/services/boot.rs @@ -320,6 +320,10 @@ pub struct Manifest { pub enclave: NitroConfig, /// Patch set members and threshold pub patch_set: PatchSet, + /// Client timeout for calls via the VSOCK/USOCK, defaults to 5s if not specified + pub client_timeout_ms: Option, + /// Pool size argument used to set up our socket pipes, defaults to 1 if not specified + pub pool_size: Option, } /// An approval by a Quorum Member. diff --git a/src/qos_core/src/protocol/services/provision.rs b/src/qos_core/src/protocol/services/provision.rs index 80e5fc2a..84f8c606 100644 --- a/src/qos_core/src/protocol/services/provision.rs +++ b/src/qos_core/src/protocol/services/provision.rs @@ -211,6 +211,8 @@ mod test { members: members.clone().into_iter().map(|(m, _)| m).collect(), }, patch_set: PatchSet::default(), + pool_size: None, + client_timeout_ms: None, }; let approvals: Vec<_> = members diff --git a/src/qos_core/src/protocol/state.rs b/src/qos_core/src/protocol/state.rs index d4516fa9..8c52a451 100644 --- a/src/qos_core/src/protocol/state.rs +++ b/src/qos_core/src/protocol/state.rs @@ -6,9 +6,6 @@ use super::{ }; use crate::handles::Handles; -/// The timeout for the qos core when making requests to an enclave app. -pub const ENCLAVE_APP_SOCKET_CLIENT_TIMEOUT_SECS: i64 = 5; - /// Enclave phase #[derive( Debug, diff --git a/src/qos_core/src/reaper.rs b/src/qos_core/src/reaper.rs index 4035ab3c..1cae4d0c 100644 --- a/src/qos_core/src/reaper.rs +++ b/src/qos_core/src/reaper.rs @@ -30,6 +30,7 @@ pub const REAPER_RESTART_DELAY: Duration = Duration::from_millis(50); pub const REAPER_EXIT_DELAY: Duration = Duration::from_secs(3); const REAPER_STATE_CHECK_DELAY: Duration = Duration::from_millis(100); +const DEFAULT_POOL_SIZE: u8 = 1; // runs the enclave and app servers, waiting for manifest/pivot // executed as a task from `Reaper::execute` @@ -56,22 +57,26 @@ async fn run_server( return; } - let (manifest_present, pool_size) = - get_pool_size_from_pivot_args(&handles); - - if manifest_present { - let pool_size = pool_size.unwrap_or(1); + if let Ok(envelope) = handles.get_manifest_envelope() { + let pool_size = + envelope.manifest.pool_size.unwrap_or(DEFAULT_POOL_SIZE); // expand server to pool_size server .listen_to(pool_size, &processor) .expect("unable to listen_to on the running server"); - // expand app connections to pool_size - processor - .write() - .await - .expand_to(pool_size) - .await - .expect("unable to expand_to on the processor app pool"); + { + // get the processor writable + let mut p = processor.write().await; + + // expand app connections to pool_size + p.expand_to(pool_size) + .await + .expect("unable to expand_to on the processor app pool"); + if let Some(timeout_ms) = envelope.manifest.client_timeout_ms { + let timeout = Duration::from_millis(timeout_ms.into()); + p.set_client_timeout(timeout); + } + } *server_state.write().unwrap() = InterState::PivotReady; eprintln!("Reaper::server manifest is present, breaking out of server check loop"); @@ -147,13 +152,18 @@ impl Reaper { println!("Reaper::execute about to spawn pivot"); - let PivotConfig { args, restart, .. } = handles + let manifest = handles .get_manifest_envelope() .expect("Checked above that the manifest exists.") - .manifest - .pivot; + .manifest; + let PivotConfig { args, restart, .. } = manifest.pivot; let mut pivot = Command::new(handles.pivot_path()); + // set the pool-size env var for pivots that use it + pivot.env( + "POOL_SIZE", + manifest.pool_size.unwrap_or(DEFAULT_POOL_SIZE).to_string(), + ); pivot.args(&args[..]); match restart { RestartPolicy::Always => loop { @@ -206,77 +216,4 @@ enum InterState { Quitting, } -// return if we have manifest and get pool_size args if present from it -fn get_pool_size_from_pivot_args(handles: &Handles) -> (bool, Option) { - if let Ok(envelope) = handles.get_manifest_envelope() { - (true, extract_pool_size_arg(&envelope.manifest.pivot.args)) - } else { - (false, None) - } -} - -// find the u32 value of --pool-size argument passed to the pivot if present -fn extract_pool_size_arg(args: &[String]) -> Option { - if let Some((i, _)) = - args.iter().enumerate().find(|(_, a)| *a == "--pool-size") - { - if let Some(pool_size_str) = args.get(i + 1) { - match pool_size_str.parse::() { - Ok(pool_size) => Some(pool_size), - Err(_) => None, - } - } else { - None - } - } else { - None - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn extract_pool_size_arg_works() { - // no arg - assert_eq!( - extract_pool_size_arg(&vec![ - "unrelated".to_owned(), - "--args".to_owned(), - ]), - None - ); - - // should work - assert_eq!( - extract_pool_size_arg(&vec![ - "--pool-size".to_owned(), - "8".to_owned(), - ]), - Some(8) - ); - - // wrong number, expect None - assert_eq!( - extract_pool_size_arg(&vec![ - "--pool-size".to_owned(), - "8a".to_owned(), - ]), - None - ); - - // duplicate arg, use 1st - assert_eq!( - extract_pool_size_arg(&vec![ - "--pool-size".to_owned(), - "8".to_owned(), - "--pool-size".to_owned(), - "9".to_owned(), - ]), - Some(8) - ); - } -} - // See qos_test/tests/async_reaper for more tests diff --git a/src/qos_core/src/server.rs b/src/qos_core/src/server.rs index 8ffdb506..5b42e546 100644 --- a/src/qos_core/src/server.rs +++ b/src/qos_core/src/server.rs @@ -85,7 +85,7 @@ impl SocketServer { /// Expand the server with listeners up to pool size. This adds new tasks as needed. pub fn listen_to

( &mut self, - pool_size: u32, + pool_size: u8, processor: &SharedProcessor

, ) -> Result<(), IOError> where diff --git a/src/qos_host/src/cli.rs b/src/qos_host/src/cli.rs index 016b2269..a92fb70f 100644 --- a/src/qos_host/src/cli.rs +++ b/src/qos_host/src/cli.rs @@ -4,11 +4,12 @@ use std::{ env, net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, + time::Duration, }; use qos_core::{ cli::{CID, PORT, USOCK}, - io::{SocketAddress, StreamPool, TimeVal, TimeValLike}, + io::{SocketAddress, StreamPool}, parser::{GetParserForOptions, OptionsParser, Parser, Token}, }; @@ -110,11 +111,11 @@ impl HostOpts { SocketAddr::new(IpAddr::V4(ip), port) } - pub(crate) fn socket_timeout(&self) -> TimeVal { + pub(crate) fn socket_timeout(&self) -> Duration { let default_timeout = &qos_core::DEFAULT_SOCKET_TIMEOUT_MS.to_owned(); let timeout_str = self.parsed.single(SOCKET_TIMEOUT).unwrap_or(default_timeout); - TimeVal::milliseconds( + Duration::from_millis( timeout_str.parse().expect("invalid timeout value"), ) } diff --git a/src/qos_host/src/host.rs b/src/qos_host/src/host.rs index 7d46f6b7..f47e9998 100644 --- a/src/qos_host/src/host.rs +++ b/src/qos_host/src/host.rs @@ -17,7 +17,7 @@ #![warn(missing_docs, clippy::pedantic)] #![allow(clippy::missing_errors_doc)] -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use axum::{ body::Bytes, @@ -30,7 +30,7 @@ use axum::{ use borsh::BorshDeserialize; use qos_core::{ client::SocketClient, - io::{SharedStreamPool, TimeVal}, + io::SharedStreamPool, protocol::{msg::ProtocolMsg, ProtocolError, ProtocolPhase}, }; @@ -49,7 +49,7 @@ struct QosHostState { #[allow(clippy::module_name_repetitions)] pub struct HostServer { enclave_pool: SharedStreamPool, - timeout: TimeVal, + timeout: Duration, addr: SocketAddr, base_path: Option, } @@ -60,7 +60,7 @@ impl HostServer { #[must_use] pub fn new( enclave_pool: SharedStreamPool, - timeout: TimeVal, + timeout: Duration, addr: SocketAddr, base_path: Option, ) -> Self { diff --git a/src/qos_net/src/cli.rs b/src/qos_net/src/cli.rs index 879c8eaa..795ee71b 100644 --- a/src/qos_net/src/cli.rs +++ b/src/qos_net/src/cli.rs @@ -37,7 +37,7 @@ impl ProxyOpts { pub(crate) fn async_pool( &self, ) -> Result { - let pool_size: u32 = self + let pool_size: u8 = self .parsed .single(POOL_SIZE) .expect("invalid pool options")