Skip to content

Commit

Permalink
Feature/latency based gateway selection (#3081)
Browse files Browse the repository at this point in the history
* wip

* new option to select gateways based on latency

* further changes for wasm-compatibility

* post rebase fixes + clippy

I know, I should have probably included them properly during rebasing ¯\_(ツ)_/¯

* android change

* wasm: the gift that keeps on giving
  • Loading branch information
jstuczyn committed Feb 23, 2023
1 parent 0b0ec07 commit 128dfa6
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 33 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions clients/client-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde_json = "1.0.89"
tap = "1.0.1"
thiserror = "1.0.34"
url = { version ="2.2", features = ["serde"] }
tungstenite = { version = "0.13.0", default-features = false }
tokio = { version = "1.24.1", features = ["macros"]}
time = "0.3.17"

Expand All @@ -44,6 +45,9 @@ features = ["time"]
version = "1.24.1"
features = ["time"]

[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
version = "0.14"

[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
version = "0.6.2"
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
Expand All @@ -65,6 +69,7 @@ features = ["futures"]

[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
path = "../../common/wasm-utils"
features = ["websocket"]

[target."cfg(target_arch = \"wasm32\")".dependencies.time]
version = "0.3.17"
Expand Down
2 changes: 1 addition & 1 deletion clients/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ where
}
let gateway_address = self.gateway_config.gateway_listener.clone();
if gateway_address.is_empty() {
return Err(ClientCoreError::GatwayAddressUnknown);
return Err(ClientCoreError::GatewayAddressUnknown);
}

let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
Expand Down
28 changes: 27 additions & 1 deletion clients/client-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use gateway_client::error::GatewayClientError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use validator_client::ValidatorClientError;

Expand Down Expand Up @@ -53,7 +54,32 @@ pub enum ClientCoreError {
GatewayOwnerUnknown,

#[error("The address of the gateway is unknown - did you run init?")]
GatwayAddressUnknown,
GatewayAddressUnknown,

#[error("The gateway is malformed: {source}")]
MalformedGateway {
#[from]
source: GatewayConversionError,
},

#[error("failed to establish connection to gateway: {source}")]
GatewayConnectionFailure {
#[from]
source: tungstenite::Error,
},

#[cfg(target_arch = "wasm32")]
#[error("failed to establish gateway connection (wasm)")]
GatewayJsConnectionFailure,

#[error("Gateway connection was abruptly closed")]
GatewayConnectionAbruptlyClosed,

#[error("Timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,

#[error("No ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },

#[error("failed to register receiver for reconstructed mixnet messages")]
FailedToRegisterReceiver,
Expand Down
219 changes: 195 additions & 24 deletions clients/client-core/src/init/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,223 @@ use crate::{
config::{persistence::key_pathfinder::ClientKeyPathfinder, Config},
error::ClientCoreError,
};
#[cfg(target_arch = "wasm32")]
use gateway_client::wasm_mockups::SigningNyxdClient;
use futures::{SinkExt, StreamExt};
use gateway_client::GatewayClient;
use gateway_requests::registration::handshake::SharedKeys;
use log::{debug, info, trace, warn};
use nym_config::NymConfig;
use nym_crypto::asymmetric::identity;
use nym_topology::{filter::VersionFilterable, gateway};
use rand::{seq::SliceRandom, thread_rng};
use rand::{seq::SliceRandom, thread_rng, Rng};
use std::{sync::Arc, time::Duration};
use tap::TapFallible;
use tungstenite::Message;
use url::Url;

#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(not(target_arch = "wasm32"))]
use validator_client::nyxd::SigningNyxdClient;

pub(super) async fn query_gateway_details(
validator_servers: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
) -> Result<gateway::Node, ClientCoreError> {
let nym_api = validator_servers
.choose(&mut thread_rng())
#[cfg(not(target_arch = "wasm32"))]
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;

#[cfg(target_arch = "wasm32")]
use gateway_client::wasm_mockups::SigningNyxdClient;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;

#[cfg(target_arch = "wasm32")]
type WsConn = JSWebsocket;

const MEASUREMENTS: usize = 3;

#[cfg(not(target_arch = "wasm32"))]
const CONN_TIMEOUT: Duration = Duration::from_millis(1500);
const PING_TIMEOUT: Duration = Duration::from_millis(1000);

struct GatewayWithLatency {
gateway: gateway::Node,
latency: Duration,
}

impl GatewayWithLatency {
fn new(gateway: gateway::Node, latency: Duration) -> Self {
GatewayWithLatency { gateway, latency }
}
}

async fn current_gateways<R: Rng>(
rng: &mut R,
nym_apis: Vec<Url>,
) -> Result<Vec<gateway::Node>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let validator_client = validator_client::client::NymApiClient::new(nym_api.clone());
let client = validator_client::client::NymApiClient::new(nym_api.clone());

log::trace!("Fetching list of gateways from: {}", nym_api);
let gateways = validator_client.get_cached_gateways().await?;

let gateways = client.get_cached_gateways().await?;
let valid_gateways = gateways
.into_iter()
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<gateway::Node>>();

// we were always filtering by version so I'm not removing that 'feature'
let filtered_gateways = valid_gateways.filter_by_version(env!("CARGO_PKG_VERSION"));
Ok(filtered_gateways)
}

#[cfg(not(target_arch = "wasm32"))]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure.into()),
Ok(Ok((stream, _))) => Ok(stream),
}
}

#[cfg(target_arch = "wasm32")]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}

async fn measure_latency(gateway: gateway::Node) -> Result<GatewayWithLatency, ClientCoreError> {
let addr = gateway.clients_address();
trace!(
"establishing connection to {} ({addr})...",
gateway.identity_key,
);
let mut stream = connect(&addr).await?;

let mut results = Vec::new();
for _ in 0..MEASUREMENTS {
let measurement_future = async {
let ping_content = vec![1, 2, 3];
let start = Instant::now();
stream.send(Message::Ping(ping_content.clone())).await?;

match stream.next().await {
Some(Ok(Message::Pong(content))) => {
if content == ping_content {
let elapsed = Instant::now().duration_since(start);
trace!("current ping time: {elapsed:?}");
results.push(elapsed);
} else {
warn!("received a pong message with different content? wtf.")
}
}
Some(Ok(_)) => warn!("received a message that's not a pong!"),
Some(Err(err)) => return Err(err.into()),
None => return Err(ClientCoreError::GatewayConnectionAbruptlyClosed),
}

Ok::<(), ClientCoreError>(())
};

// thanks to wasm we can't use tokio::time::timeout : (
#[cfg(not(target_arch = "wasm32"))]
let timeout = tokio::time::sleep(PING_TIMEOUT);
#[cfg(not(target_arch = "wasm32"))]
tokio::pin!(timeout);

#[cfg(target_arch = "wasm32")]
let mut timeout = wasm_timer::Delay::new(PING_TIMEOUT);

tokio::select! {
_ = &mut timeout => {
warn!("timed out while trying to perform measurement...")
}
res = measurement_future => res?,
}
}

let count = results.len() as u64;
if count == 0 {
return Err(ClientCoreError::NoGatewayMeasurements {
identity: gateway.identity_key.to_base58_string(),
});
}

let sum: Duration = results.into_iter().sum();
let avg = Duration::from_nanos(sum.as_nanos() as u64 / count);

Ok(GatewayWithLatency::new(gateway, avg))
}

async fn choose_gateway_by_latency<R: Rng>(
rng: &mut R,
gateways: Vec<gateway::Node>,
) -> Result<gateway::Node, ClientCoreError> {
info!("choosing gateway by latency...");

let mut gateways_with_latency = Vec::new();
for gateway in gateways {
let id = *gateway.identity();
trace!("measuring latency to {id}...");
let with_latency = match measure_latency(gateway).await {
Ok(res) => res,
Err(err) => {
warn!("failed to measure {id}: {err}");
continue;
}
};
debug!(
"{id} ({}): {:?}",
with_latency.gateway.location, with_latency.latency
);
gateways_with_latency.push(with_latency)
}

let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.expect("invalid selection weight!");

info!(
"chose gateway {} (located at {}) with average latency of {:?}",
chosen.gateway.identity_key, chosen.gateway.location, chosen.latency
);

Ok(chosen.gateway.clone())
}

fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: Vec<gateway::Node>,
) -> Result<gateway::Node, ClientCoreError> {
gateways
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
.cloned()
}

pub(super) async fn query_gateway_details(
validator_servers: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
by_latency: bool,
) -> Result<gateway::Node, ClientCoreError> {
let mut rng = thread_rng();
let gateways = current_gateways(&mut rng, validator_servers).await?;

// if we have chosen particular gateway - use it, otherwise choose a random one.
// (remember that in active topology all gateways have at least 100 reputation so should
// be working correctly)
if let Some(gateway_id) = chosen_gateway_id {
filtered_gateways
.iter()
.find(|gateway| gateway.identity_key == gateway_id)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_id.to_string()))
.cloned()
// if we set an explicit gateway, use that one and nothing else
if let Some(explicitly_chosen) = chosen_gateway_id {
gateways
.into_iter()
.find(|gateway| gateway.identity_key == explicitly_chosen)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(explicitly_chosen.to_string()))
} else if by_latency {
choose_gateway_by_latency(&mut rng, gateways).await
} else {
filtered_gateways
.choose(&mut rand::thread_rng())
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
.cloned()
uniformly_random_gateway(&mut rng, gateways)
}
}

Expand Down
14 changes: 10 additions & 4 deletions clients/client-core/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ pub async fn register_with_gateway(
key_manager: &mut KeyManager,
nym_api_endpoints: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
by_latency: bool,
) -> Result<GatewayEndpointConfig, ClientCoreError> {
// Get the gateway details of the gateway we will use
let gateway = helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id).await?;
let gateway =
helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id, by_latency).await?;
log::debug!("Querying gateway gives: {}", gateway);

let our_identity = key_manager.identity_keypair();
Expand All @@ -102,6 +104,7 @@ pub async fn setup_gateway_from_config<C, T>(
register_gateway: bool,
user_chosen_gateway_id: Option<identity::PublicKey>,
config: &Config<T>,
by_latency: bool,
) -> Result<GatewayEndpointConfig, ClientCoreError>
where
C: NymConfig + ClientCoreConfigTrait,
Expand All @@ -117,9 +120,12 @@ where
}

// Else, we preceed by querying the nym-api
let gateway =
helpers::query_gateway_details(config.get_nym_api_endpoints(), user_chosen_gateway_id)
.await?;
let gateway = helpers::query_gateway_details(
config.get_nym_api_endpoints(),
user_chosen_gateway_id,
by_latency,
)
.await?;
log::debug!("Querying gateway gives: {}", gateway);

// If we are not registering, just return this and assume the caller has the keys already and
Expand Down
6 changes: 6 additions & 0 deletions clients/native/src/commands/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ pub(crate) struct Init {
#[clap(long)]
gateway: Option<identity::PublicKey>,

/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
/// uniformly.
#[clap(long, conflicts_with = "gateway")]
latency_based_selection: bool,

/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
/// potentially causing loss of access.
#[clap(long)]
Expand Down Expand Up @@ -143,6 +148,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> {
register_gateway,
user_chosen_gateway_id,
config.get_base(),
args.latency_based_selection,
)
.await
.tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?;
Expand Down
Loading

0 comments on commit 128dfa6

Please sign in to comment.