Skip to content

Commit

Permalink
v1.18: Make the quic server connection table use an async lock, reduc…
Browse files Browse the repository at this point in the history
…ing thrashing (backport of #293) (#300)

Make the quic server connection table use an async lock, reducing thrashing (#293)
  • Loading branch information
mergify[bot] authored and willhickey committed Mar 23, 2024
1 parent d754789 commit e2d34d3
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,26 @@ use {
std::{
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
// CAUTION: be careful not to introduce any awaits while holding an RwLock.
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, MutexGuard, RwLock,
Arc, RwLock,
},
time::{Duration, Instant},
},
tokio::{task::JoinHandle, time::timeout},
tokio::{
// CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
// This is done so that sync code can also access the stake table.
// Make sure we don't hold a sync lock across an await - including the await to
// lock an async Mutex. This does not happen now and should not happen as long as we
// don't hold an async Mutex and sync RwLock at the same time (currently true)
// but if we do, the scope of the RwLock must always be a subset of the async Mutex
// (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
// introduce any other awaits while holding the RwLock.
sync::{Mutex, MutexGuard},
task::JoinHandle,
time::timeout,
},
};

const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -384,7 +397,7 @@ fn handle_and_cache_new_connection(
}
}

fn prune_unstaked_connections_and_add_new_connection(
async fn prune_unstaked_connections_and_add_new_connection(
connection: Connection,
connection_table: Arc<Mutex<ConnectionTable>>,
max_connections: usize,
Expand All @@ -395,7 +408,7 @@ fn prune_unstaked_connections_and_add_new_connection(
let stats = params.stats.clone();
if max_connections > 0 {
let connection_table_clone = connection_table.clone();
let mut connection_table = connection_table.lock().unwrap();
let mut connection_table = connection_table.lock().await;
prune_unstaked_connection_table(&mut connection_table, max_connections, stats);
handle_and_cache_new_connection(
connection,
Expand Down Expand Up @@ -505,7 +518,8 @@ async fn setup_connection(

match params.peer_type {
ConnectionPeerType::Staked(stake) => {
let mut connection_table_l = staked_connection_table.lock().unwrap();
let mut connection_table_l = staked_connection_table.lock().await;

if connection_table_l.total_size >= max_staked_connections {
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
Expand Down Expand Up @@ -536,7 +550,9 @@ async fn setup_connection(
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
)
.await
{
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
Expand All @@ -558,7 +574,9 @@ async fn setup_connection(
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
)
.await
{
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -801,7 +819,7 @@ async fn handle_connection(
}
}

let removed_connection_count = connection_table.lock().unwrap().remove_connection(
let removed_connection_count = connection_table.lock().await.remove_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
stable_id,
Expand Down

0 comments on commit e2d34d3

Please sign in to comment.