Skip to content

Commit

Permalink
client: Use async connection in async TPU client (#25969)
Browse files Browse the repository at this point in the history
* client: Add nonblocking QuicTpuConnection implementation

* Remove integer arithmetic

* client: Support sync and async connections in cache

* client: Use async connection in async TPU client

* Address feedback

* Rename Connection -> BaseTpuConnection
  • Loading branch information
joncinque committed Jun 28, 2022
1 parent d07b079 commit 2436a2b
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 172 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ anyhow = "1.0.57"
assert_matches = "1.5.0"
jsonrpc-http-server = "18.0.0"
solana-logger = { path = "../logger", version = "=1.11.2" }
solana-perf = { path = "../perf", version = "=1.11.2" }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
163 changes: 105 additions & 58 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use {
crate::{
nonblocking::quic_client::QuicLazyInitializedEndpoint,
quic_client::QuicTpuConnection,
tpu_connection::{ClientStats, Connection},
udp_client::UdpTpuConnection,
nonblocking::{
quic_client::{QuicClient, QuicLazyInitializedEndpoint},
tpu_connection::NonblockingConnection,
},
tpu_connection::{BlockingConnection, ClientStats},
},
indexmap::map::{Entry, IndexMap},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -246,7 +247,7 @@ pub struct ConnectionCache {
/// Models the pool of connections
struct ConnectionPool {
/// The connections in the pool
connections: Vec<Arc<Connection>>,
connections: Vec<Arc<BaseTpuConnection>>,

/// Connections in this pool share the same endpoint
endpoint: Option<Arc<QuicLazyInitializedEndpoint>>,
Expand All @@ -255,7 +256,7 @@ struct ConnectionPool {
impl ConnectionPool {
/// Get a connection from the pool. It must have at least one connection in the pool.
/// This randomly picks a connection in the pool.
fn borrow_connection(&self) -> Arc<Connection> {
fn borrow_connection(&self) -> Arc<BaseTpuConnection> {
let mut rng = thread_rng();
let n = rng.gen_range(0, self.connections.len());
self.connections[n].clone()
Expand Down Expand Up @@ -318,63 +319,57 @@ impl ConnectionCache {
)
});

let (cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) =
if to_create_connection {
let connection: Connection = match &self.use_quic {
UseQUIC::Yes => QuicTpuConnection::new(
endpoint.as_ref().unwrap().clone(),
*addr,
self.stats.clone(),
)
.into(),
UseQUIC::No(socket) => {
UdpTpuConnection::new(socket.clone(), *addr, self.stats.clone()).into()
}
};

let connection = Arc::new(connection);

// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection {
let connection = match &self.use_quic {
UseQUIC::Yes => BaseTpuConnection::Quic(Arc::new(QuicClient::new(
endpoint.as_ref().unwrap().clone(),
*addr,
))),
UseQUIC::No(socket) => BaseTpuConnection::Udp(socket.clone()),
};

match map.entry(*addr) {
Entry::Occupied(mut entry) => {
let pool = entry.get_mut();
pool.connections.push(connection);
}
Entry::Vacant(entry) => {
entry.insert(ConnectionPool {
connections: vec![connection],
endpoint,
});
}
let connection = Arc::new(connection);

// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();

match map.entry(*addr) {
Entry::Occupied(mut entry) => {
let pool = entry.get_mut();
pool.connections.push(connection);
}
(
false,
self.stats.clone(),
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} else {
(true, self.stats.clone(), 0, 0)
};
Entry::Vacant(entry) => {
entry.insert(ConnectionPool {
connections: vec![connection],
endpoint,
});
}
}
(
false,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} else {
(true, 0, 0)
};

let pool = map.get(addr).unwrap();
let connection = pool.borrow_connection();

CreateConnectionResult {
connection,
cache_hit,
connection_cache_stats,
connection_cache_stats: self.stats.clone(),
num_evictions,
eviction_timing_ms,
}
Expand Down Expand Up @@ -443,7 +438,10 @@ impl ConnectionCache {
}
}

pub fn get_connection(&self, addr: &SocketAddr) -> Arc<Connection> {
fn get_connection_and_log_stats(
&self,
addr: &SocketAddr,
) -> (Arc<BaseTpuConnection>, Arc<ConnectionCacheStats>) {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
Expand Down Expand Up @@ -490,7 +488,17 @@ impl ConnectionCache {
.get_connection_ms
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);

connection
(connection, connection_cache_stats)
}

pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_blocking_connection(*addr, connection_cache_stats)
}

pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_nonblocking_connection(*addr, connection_cache_stats)
}
}

Expand All @@ -507,8 +515,46 @@ impl Default for ConnectionCache {
}
}

enum BaseTpuConnection {
Udp(Arc<UdpSocket>),
Quic(Arc<QuicClient>),
}
impl BaseTpuConnection {
fn new_blocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> BlockingConnection {
use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
match self {
BaseTpuConnection::Udp(udp_socket) => {
UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into()
}
BaseTpuConnection::Quic(quic_client) => {
QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
}
}
}

fn new_nonblocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> NonblockingConnection {
use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
match self {
BaseTpuConnection::Udp(udp_socket) => {
UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into()
}
BaseTpuConnection::Quic(quic_client) => {
QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
}
}
}
}

struct GetConnectionResult {
connection: Arc<Connection>,
connection: Arc<BaseTpuConnection>,
cache_hit: bool,
report_stats: bool,
map_timing_ms: u64,
Expand All @@ -519,7 +565,7 @@ struct GetConnectionResult {
}

struct CreateConnectionResult {
connection: Arc<Connection>,
connection: Arc<BaseTpuConnection>,
cache_hit: bool,
connection_cache_stats: Arc<ConnectionCacheStats>,
num_evictions: u64,
Expand Down Expand Up @@ -578,6 +624,7 @@ mod tests {
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let conn = &map.get(a).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*a, connection_cache.stats.clone());
assert!(a.ip() == conn.tpu_addr().ip());
});
}
Expand Down
83 changes: 80 additions & 3 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use {
crate::{
client_error::ClientErrorKind, connection_cache::ConnectionCacheStats,
tpu_connection::ClientStats,
nonblocking::tpu_connection::TpuConnection, tpu_connection::ClientStats,
},
async_mutex::Mutex,
async_trait::async_trait,
futures::future::join_all,
itertools::Itertools,
log::*,
Expand All @@ -15,8 +16,9 @@ use {
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
solana_sdk::{
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
transport::Result as TransportResult,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -424,3 +426,78 @@ impl QuicClient {
self.stats.clone()
}
}

pub struct QuicTpuConnection {
client: Arc<QuicClient>,
connection_stats: Arc<ConnectionCacheStats>,
}

impl QuicTpuConnection {
pub fn base_stats(&self) -> Arc<ClientStats> {
self.client.stats()
}

pub fn new(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
let client = Arc::new(QuicClient::new(endpoint, addr));
Self::new_with_client(client, connection_stats)
}

pub fn new_with_client(
client: Arc<QuicClient>,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
Self {
client,
connection_stats,
}
}
}

#[async_trait]
impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.client.tpu_addr()
}

async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = ClientStats::default();
let len = buffers.len();
let res = self
.client
.send_batch(buffers, &stats, self.connection_stats.clone())
.await;
self.connection_stats
.add_client_stats(&stats, len, res.is_ok());
res?;
Ok(())
}

async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = Arc::new(ClientStats::default());
let send_buffer =
self.client
.send_buffer(wire_transaction, &stats, self.connection_stats.clone());
if let Err(e) = send_buffer.await {
warn!(
"Failed to send transaction async to {}, error: {:?} ",
self.tpu_addr(),
e
);
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
self.connection_stats.add_client_stats(&stats, 1, false);
} else {
self.connection_stats.add_client_stats(&stats, 1, true);
}
Ok(())
}
}

0 comments on commit 2436a2b

Please sign in to comment.