From 128dfa6d8184b27edeb617ccab2f56d097df3742 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Thu, 23 Feb 2023 17:09:22 +0000 Subject: [PATCH] Feature/latency based gateway selection (#3081) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip * new option to select gateways based on latency * further changes for wasm-compatibility * post rebase fixes + clippy I know, I should have probably included them properly during rebasing ¯\_(ツ)_/¯ * android change * wasm: the gift that keeps on giving --- Cargo.lock | 2 + clients/client-core/Cargo.toml | 5 + .../client-core/src/client/base_client/mod.rs | 2 +- clients/client-core/src/error.rs | 28 ++- clients/client-core/src/init/helpers.rs | 219 ++++++++++++++++-- clients/client-core/src/init/mod.rs | 14 +- clients/native/src/commands/init.rs | 6 + clients/socks5/src/commands/init.rs | 6 + .../validator-client/src/client.rs | 4 +- .../desktop/src-tauri/src/config/mod.rs | 2 + .../mobile/src-tauri/src/config/mod.rs | 1 + sdk/rust/nym-sdk/src/mixnet/client.rs | 2 + .../network-requester/src/cli/init.rs | 6 + 13 files changed, 264 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f8d33aacf..4f0fab00f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -705,6 +705,8 @@ dependencies = [ "time 0.3.17", "tokio", "tokio-stream", + "tokio-tungstenite 0.14.0", + "tungstenite 0.13.0", "url", "validator-client", "wasm-bindgen", diff --git a/clients/client-core/Cargo.toml b/clients/client-core/Cargo.toml index df20c45d98..a467192fe8 100644 --- a/clients/client-core/Cargo.toml +++ b/clients/client-core/Cargo.toml @@ -20,6 +20,7 @@ serde_json = "1.0.89" tap = "1.0.1" thiserror = "1.0.34" url = { version ="2.2", features = ["serde"] } +tungstenite = { version = "0.13.0", default-features = false } tokio = { version = "1.24.1", features = ["macros"]} time = "0.3.17" @@ -44,6 +45,9 @@ features = ["time"] version = "1.24.1" features = ["time"] +[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite] +version = "0.14" + [target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx] version = "0.6.2" features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] @@ -65,6 +69,7 @@ features = ["futures"] [target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils] path = "../../common/wasm-utils" +features = ["websocket"] [target."cfg(target_arch = \"wasm32\")".dependencies.time] version = "0.3.17" diff --git a/clients/client-core/src/client/base_client/mod.rs b/clients/client-core/src/client/base_client/mod.rs index f517698bf2..64a292d2b8 100644 --- a/clients/client-core/src/client/base_client/mod.rs +++ b/clients/client-core/src/client/base_client/mod.rs @@ -304,7 +304,7 @@ where } let gateway_address = self.gateway_config.gateway_listener.clone(); if gateway_address.is_empty() { - return Err(ClientCoreError::GatwayAddressUnknown); + return Err(ClientCoreError::GatewayAddressUnknown); } let gateway_identity = identity::PublicKey::from_base58_string(gateway_id) diff --git a/clients/client-core/src/error.rs b/clients/client-core/src/error.rs index 0436187ed8..4c9e9e5b34 100644 --- a/clients/client-core/src/error.rs +++ b/clients/client-core/src/error.rs @@ -3,6 +3,7 @@ use gateway_client::error::GatewayClientError; use nym_crypto::asymmetric::identity::Ed25519RecoveryError; +use nym_topology::gateway::GatewayConversionError; use nym_topology::NymTopologyError; use validator_client::ValidatorClientError; @@ -53,7 +54,32 @@ pub enum ClientCoreError { GatewayOwnerUnknown, #[error("The address of the gateway is unknown - did you run init?")] - GatwayAddressUnknown, + GatewayAddressUnknown, + + #[error("The gateway is malformed: {source}")] + MalformedGateway { + #[from] + source: GatewayConversionError, + }, + + #[error("failed to establish connection to gateway: {source}")] + GatewayConnectionFailure { + #[from] + source: tungstenite::Error, + }, + + #[cfg(target_arch = "wasm32")] + #[error("failed to establish gateway connection (wasm)")] + GatewayJsConnectionFailure, + + #[error("Gateway connection was abruptly closed")] + GatewayConnectionAbruptlyClosed, + + #[error("Timed out while trying to establish gateway connection")] + GatewayConnectionTimeout, + + #[error("No ping measurements for the gateway ({identity}) performed")] + NoGatewayMeasurements { identity: String }, #[error("failed to register receiver for reconstructed mixnet messages")] FailedToRegisterReceiver, diff --git a/clients/client-core/src/init/helpers.rs b/clients/client-core/src/init/helpers.rs index e7d03d585a..b3f5e1c46d 100644 --- a/clients/client-core/src/init/helpers.rs +++ b/clients/client-core/src/init/helpers.rs @@ -6,52 +6,223 @@ use crate::{ config::{persistence::key_pathfinder::ClientKeyPathfinder, Config}, error::ClientCoreError, }; -#[cfg(target_arch = "wasm32")] -use gateway_client::wasm_mockups::SigningNyxdClient; +use futures::{SinkExt, StreamExt}; use gateway_client::GatewayClient; use gateway_requests::registration::handshake::SharedKeys; +use log::{debug, info, trace, warn}; use nym_config::NymConfig; use nym_crypto::asymmetric::identity; use nym_topology::{filter::VersionFilterable, gateway}; -use rand::{seq::SliceRandom, thread_rng}; +use rand::{seq::SliceRandom, thread_rng, Rng}; use std::{sync::Arc, time::Duration}; use tap::TapFallible; +use tungstenite::Message; use url::Url; + +#[cfg(not(target_arch = "wasm32"))] +use tokio::net::TcpStream; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::Instant; +#[cfg(not(target_arch = "wasm32"))] +use tokio_tungstenite::connect_async; +#[cfg(not(target_arch = "wasm32"))] +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; #[cfg(not(target_arch = "wasm32"))] use validator_client::nyxd::SigningNyxdClient; -pub(super) async fn query_gateway_details( - validator_servers: Vec, - chosen_gateway_id: Option, -) -> Result { - let nym_api = validator_servers - .choose(&mut thread_rng()) +#[cfg(not(target_arch = "wasm32"))] +type WsConn = WebSocketStream>; + +#[cfg(target_arch = "wasm32")] +use gateway_client::wasm_mockups::SigningNyxdClient; +#[cfg(target_arch = "wasm32")] +use wasm_timer::Instant; +#[cfg(target_arch = "wasm32")] +use wasm_utils::websocket::JSWebsocket; + +#[cfg(target_arch = "wasm32")] +type WsConn = JSWebsocket; + +const MEASUREMENTS: usize = 3; + +#[cfg(not(target_arch = "wasm32"))] +const CONN_TIMEOUT: Duration = Duration::from_millis(1500); +const PING_TIMEOUT: Duration = Duration::from_millis(1000); + +struct GatewayWithLatency { + gateway: gateway::Node, + latency: Duration, +} + +impl GatewayWithLatency { + fn new(gateway: gateway::Node, latency: Duration) -> Self { + GatewayWithLatency { gateway, latency } + } +} + +async fn current_gateways( + rng: &mut R, + nym_apis: Vec, +) -> Result, ClientCoreError> { + let nym_api = nym_apis + .choose(rng) .ok_or(ClientCoreError::ListOfNymApisIsEmpty)?; - let validator_client = validator_client::client::NymApiClient::new(nym_api.clone()); + let client = validator_client::client::NymApiClient::new(nym_api.clone()); log::trace!("Fetching list of gateways from: {}", nym_api); - let gateways = validator_client.get_cached_gateways().await?; + + let gateways = client.get_cached_gateways().await?; let valid_gateways = gateways .into_iter() .filter_map(|gateway| gateway.try_into().ok()) .collect::>(); + // we were always filtering by version so I'm not removing that 'feature' let filtered_gateways = valid_gateways.filter_by_version(env!("CARGO_PKG_VERSION")); + Ok(filtered_gateways) +} + +#[cfg(not(target_arch = "wasm32"))] +async fn connect(endpoint: &str) -> Result { + match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await { + Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout), + Ok(Err(conn_failure)) => Err(conn_failure.into()), + Ok(Ok((stream, _))) => Ok(stream), + } +} + +#[cfg(target_arch = "wasm32")] +async fn connect(endpoint: &str) -> Result { + JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure) +} + +async fn measure_latency(gateway: gateway::Node) -> Result { + let addr = gateway.clients_address(); + trace!( + "establishing connection to {} ({addr})...", + gateway.identity_key, + ); + let mut stream = connect(&addr).await?; + + let mut results = Vec::new(); + for _ in 0..MEASUREMENTS { + let measurement_future = async { + let ping_content = vec![1, 2, 3]; + let start = Instant::now(); + stream.send(Message::Ping(ping_content.clone())).await?; + + match stream.next().await { + Some(Ok(Message::Pong(content))) => { + if content == ping_content { + let elapsed = Instant::now().duration_since(start); + trace!("current ping time: {elapsed:?}"); + results.push(elapsed); + } else { + warn!("received a pong message with different content? wtf.") + } + } + Some(Ok(_)) => warn!("received a message that's not a pong!"), + Some(Err(err)) => return Err(err.into()), + None => return Err(ClientCoreError::GatewayConnectionAbruptlyClosed), + } + + Ok::<(), ClientCoreError>(()) + }; + + // thanks to wasm we can't use tokio::time::timeout : ( + #[cfg(not(target_arch = "wasm32"))] + let timeout = tokio::time::sleep(PING_TIMEOUT); + #[cfg(not(target_arch = "wasm32"))] + tokio::pin!(timeout); + + #[cfg(target_arch = "wasm32")] + let mut timeout = wasm_timer::Delay::new(PING_TIMEOUT); + + tokio::select! { + _ = &mut timeout => { + warn!("timed out while trying to perform measurement...") + } + res = measurement_future => res?, + } + } + + let count = results.len() as u64; + if count == 0 { + return Err(ClientCoreError::NoGatewayMeasurements { + identity: gateway.identity_key.to_base58_string(), + }); + } + + let sum: Duration = results.into_iter().sum(); + let avg = Duration::from_nanos(sum.as_nanos() as u64 / count); + + Ok(GatewayWithLatency::new(gateway, avg)) +} + +async fn choose_gateway_by_latency( + rng: &mut R, + gateways: Vec, +) -> Result { + info!("choosing gateway by latency..."); + + let mut gateways_with_latency = Vec::new(); + for gateway in gateways { + let id = *gateway.identity(); + trace!("measuring latency to {id}..."); + let with_latency = match measure_latency(gateway).await { + Ok(res) => res, + Err(err) => { + warn!("failed to measure {id}: {err}"); + continue; + } + }; + debug!( + "{id} ({}): {:?}", + with_latency.gateway.location, with_latency.latency + ); + gateways_with_latency.push(with_latency) + } + + let chosen = gateways_with_latency + .choose_weighted(rng, |item| 1. / item.latency.as_secs_f32()) + .expect("invalid selection weight!"); + + info!( + "chose gateway {} (located at {}) with average latency of {:?}", + chosen.gateway.identity_key, chosen.gateway.location, chosen.latency + ); + + Ok(chosen.gateway.clone()) +} + +fn uniformly_random_gateway( + rng: &mut R, + gateways: Vec, +) -> Result { + gateways + .choose(rng) + .ok_or(ClientCoreError::NoGatewaysOnNetwork) + .cloned() +} + +pub(super) async fn query_gateway_details( + validator_servers: Vec, + chosen_gateway_id: Option, + by_latency: bool, +) -> Result { + let mut rng = thread_rng(); + let gateways = current_gateways(&mut rng, validator_servers).await?; - // if we have chosen particular gateway - use it, otherwise choose a random one. - // (remember that in active topology all gateways have at least 100 reputation so should - // be working correctly) - if let Some(gateway_id) = chosen_gateway_id { - filtered_gateways - .iter() - .find(|gateway| gateway.identity_key == gateway_id) - .ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_id.to_string())) - .cloned() + // if we set an explicit gateway, use that one and nothing else + if let Some(explicitly_chosen) = chosen_gateway_id { + gateways + .into_iter() + .find(|gateway| gateway.identity_key == explicitly_chosen) + .ok_or_else(|| ClientCoreError::NoGatewayWithId(explicitly_chosen.to_string())) + } else if by_latency { + choose_gateway_by_latency(&mut rng, gateways).await } else { - filtered_gateways - .choose(&mut rand::thread_rng()) - .ok_or(ClientCoreError::NoGatewaysOnNetwork) - .cloned() + uniformly_random_gateway(&mut rng, gateways) } } diff --git a/clients/client-core/src/init/mod.rs b/clients/client-core/src/init/mod.rs index f99fa318c2..6e1700d7ae 100644 --- a/clients/client-core/src/init/mod.rs +++ b/clients/client-core/src/init/mod.rs @@ -77,9 +77,11 @@ pub async fn register_with_gateway( key_manager: &mut KeyManager, nym_api_endpoints: Vec, chosen_gateway_id: Option, + by_latency: bool, ) -> Result { // Get the gateway details of the gateway we will use - let gateway = helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id).await?; + let gateway = + helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id, by_latency).await?; log::debug!("Querying gateway gives: {}", gateway); let our_identity = key_manager.identity_keypair(); @@ -102,6 +104,7 @@ pub async fn setup_gateway_from_config( register_gateway: bool, user_chosen_gateway_id: Option, config: &Config, + by_latency: bool, ) -> Result where C: NymConfig + ClientCoreConfigTrait, @@ -117,9 +120,12 @@ where } // Else, we preceed by querying the nym-api - let gateway = - helpers::query_gateway_details(config.get_nym_api_endpoints(), user_chosen_gateway_id) - .await?; + let gateway = helpers::query_gateway_details( + config.get_nym_api_endpoints(), + user_chosen_gateway_id, + by_latency, + ) + .await?; log::debug!("Querying gateway gives: {}", gateway); // If we are not registering, just return this and assume the caller has the keys already and diff --git a/clients/native/src/commands/init.rs b/clients/native/src/commands/init.rs index 756adfa493..c41c4796a0 100644 --- a/clients/native/src/commands/init.rs +++ b/clients/native/src/commands/init.rs @@ -25,6 +25,11 @@ pub(crate) struct Init { #[clap(long)] gateway: Option, + /// Specifies whether the new gateway should be determined based by latency as opposed to being chosen + /// uniformly. + #[clap(long, conflicts_with = "gateway")] + latency_based_selection: bool, + /// Force register gateway. WARNING: this will overwrite any existing keys for the given id, /// potentially causing loss of access. #[clap(long)] @@ -143,6 +148,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> { register_gateway, user_chosen_gateway_id, config.get_base(), + args.latency_based_selection, ) .await .tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?; diff --git a/clients/socks5/src/commands/init.rs b/clients/socks5/src/commands/init.rs index 2c6ae23730..c921834ec3 100644 --- a/clients/socks5/src/commands/init.rs +++ b/clients/socks5/src/commands/init.rs @@ -37,6 +37,11 @@ pub(crate) struct Init { #[clap(long)] gateway: Option, + /// Specifies whether the new gateway should be determined based by latency as opposed to being chosen + /// uniformly. + #[clap(long, conflicts_with = "gateway")] + latency_based_selection: bool, + /// Force register gateway. WARNING: this will overwrite any existing keys for the given id, /// potentially causing loss of access. #[clap(long)] @@ -149,6 +154,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), Socks5ClientError> { register_gateway, user_chosen_gateway_id, config.get_base(), + args.latency_based_selection, ) .await .tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?; diff --git a/common/client-libs/validator-client/src/client.rs b/common/client-libs/validator-client/src/client.rs index d75542e83f..68e3358576 100644 --- a/common/client-libs/validator-client/src/client.rs +++ b/common/client-libs/validator-client/src/client.rs @@ -11,9 +11,7 @@ use nym_api_requests::models::{ GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse, RewardEstimationResponse, StakeSaturationResponse, }; -use nym_mixnet_contract_common::mixnode::MixNodeDetails; -use nym_mixnet_contract_common::MixId; -use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef}; +pub use nym_mixnet_contract_common::{mixnode::MixNodeDetails, GatewayBond, IdentityKeyRef, MixId}; #[cfg(feature = "nyxd-client")] use crate::nyxd::traits::{DkgQueryClient, MixnetQueryClient, MultisigQueryClient}; diff --git a/nym-connect/desktop/src-tauri/src/config/mod.rs b/nym-connect/desktop/src-tauri/src/config/mod.rs index 43691fb49c..e1d3379f6b 100644 --- a/nym-connect/desktop/src-tauri/src/config/mod.rs +++ b/nym-connect/desktop/src-tauri/src/config/mod.rs @@ -138,6 +138,8 @@ pub async fn init_socks5_config(provider_address: String, chosen_gateway_id: Str register_gateway, Some(chosen_gateway_id), config.get_base(), + // TODO: another instance where this setting should probably get used + false, ) .await?; diff --git a/nym-connect/mobile/src-tauri/src/config/mod.rs b/nym-connect/mobile/src-tauri/src/config/mod.rs index c6c649ab63..7d1c3f0a99 100644 --- a/nym-connect/mobile/src-tauri/src/config/mod.rs +++ b/nym-connect/mobile/src-tauri/src/config/mod.rs @@ -118,6 +118,7 @@ pub async fn init_socks5_config( &mut key_manager, nym_api_endpoints, Some(chosen_gateway_id), + false, ) .await?; diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index d8b1e06b65..36b51a0428 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -259,6 +259,8 @@ where &mut self.key_manager, self.config.nym_api_endpoints.clone(), user_chosen_gateway, + // TODO: this should probably be configurable with the config + false, ) .await?; diff --git a/service-providers/network-requester/src/cli/init.rs b/service-providers/network-requester/src/cli/init.rs index 0c9907db3a..0c8057b7cd 100644 --- a/service-providers/network-requester/src/cli/init.rs +++ b/service-providers/network-requester/src/cli/init.rs @@ -24,6 +24,11 @@ pub(crate) struct Init { #[clap(long)] gateway: Option, + /// Specifies whether the new gateway should be determined based by latency as opposed to being chosen + /// uniformly. + #[clap(long, conflicts_with = "gateway")] + latency_based_selection: bool, + /// Force register gateway. WARNING: this will overwrite any existing keys for the given id, /// potentially causing loss of access. #[clap(long)] @@ -115,6 +120,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), NetworkRequesterError> { register_gateway, user_chosen_gateway_id, config.get_base(), + args.latency_based_selection, ) .await .map_err(|source| {