Skip to content

Commit

Permalink
Setup inbound quic connections in separate futures
Browse files Browse the repository at this point in the history
Since the full setup requires waiting for a client to send data, move
this to a separate future collection which gets polled next to the main
accept future. Also limit the amount of time the future is allowed to
take by wrapping it in a timeout.

Fixes #309

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
  • Loading branch information
LeeSmet committed Jul 1, 2024
1 parent da19bd9 commit 42f87b5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 41 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- When running `mycelium` with a command, a keyfile was loaded (or created, if not
yet present). This was not necessary in that context.
- Limit the amount of time allowed for inbound quic connections to be set up, and
process multiple of them in parallel. This fixes a DOS vector against the quic
listener.

## [0.5.3] - 2024-06-07

Expand Down
114 changes: 73 additions & 41 deletions mycelium/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use quinn::crypto::rustls::QuicClientConfig;
use quinn::{MtuDiscoveryConfig, ServerConfig, TransportConfig};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::io;
Expand All @@ -24,6 +23,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{collections::hash_map::Entry, future::IntoFuture};
use tokio::net::TcpStream;
use tokio::net::{TcpListener, UdpSocket};
use tokio::task::AbortHandle;
Expand All @@ -43,6 +43,16 @@ const PEER_CONNECT_INTERVAL: Duration = Duration::from_secs(5);
/// The maximum amount of successive failures allowed when connecting to a local discovered peer,
/// before it is forgotten.
const MAX_FAILED_LOCAL_PEER_CONNECTION_ATTEMPTS: usize = 3;
/// The amount of time allowed for a peer to finish the quic handshake when it connects to us. This
/// prevents a (mallicious) peer from hogging server resources. 10 seconds should be a reasonable
/// default for this, though it can certainly be made more strict if required.
const INBOUND_QUIC_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
/// The maximum amount of concurrent quic handshakes to process from peers connecting to us. We
/// want to find a middle ground where we don't stall valid peers from connecting beceause a single
/// misbehaving peer stalls the connection task, but we also don't want to accept thousands of
/// these in parralel. For now, 10 in parallel should be sufficient, though this can be
/// increased/decreased based on observations.
const MAX_INBOUND_CONCURRENT_QUICK_HANDSHAKES: usize = 10;

/// The PeerManager creates new peers by connecting to configured addresses, and setting up the
/// connection. Once a connection is established, the created [`Peer`] is handed over to the
Expand Down Expand Up @@ -800,53 +810,75 @@ where
let router_control_tx = self.router.lock().unwrap().router_control_tx();
let dead_peer_sink = self.router.lock().unwrap().dead_peer_sink().clone();

let mut quic_con_futures = FuturesUnordered::new();

let mut pending_quic_handshakes = 0;

loop {
let con = if let Some(con) = quic_socket.accept().await {
match con.await {
Ok(con) => con,
Err(e) => {
debug!("Failed to accept quic connection: {e}");
continue;
tokio::select! {
// FuturesUnordered always returns Some(...).
Some(handshake_result) = quic_con_futures.next() => {
pending_quic_handshakes -=1;
// Since tyhpe inference failed here, use a fully quallified function call
if Result::<(),tokio::time::error::Elapsed>::is_err(&handshake_result) {
debug!("Dropping connection to peer who's handshake timed out");
}
}
} else {
// Con is closed
info!("Shutting down closed quic listener");
return;
};
maybe_con = quic_socket.accept(), if pending_quic_handshakes < MAX_INBOUND_CONCURRENT_QUICK_HANDSHAKES => {
let Some(con) = maybe_con else {
break
};

let q = match con.accept_bi().await {
Ok((tx, rx)) => Quic::new(tx, rx, con.remote_address()),
Err(e) => {
error!("Failed to accept bidirectional quic stream: {e}");
continue;
}
};

let tx_bytes = Arc::new(AtomicU64::new(0));
let rx_bytes = Arc::new(AtomicU64::new(0));
let new_peer = match Peer::new(
router_data_tx.clone(),
router_control_tx.clone(),
q,
dead_peer_sink.clone(),
tx_bytes.clone(),
rx_bytes.clone(),
) {
Ok(peer) => peer,
Err(e) => {
error!("Failed to spawn peer: {e}");
continue;
let con_future = async {
let con = match con.into_future().await {
Ok(con) => con,
Err(e) => {
debug!("Failed to accept quic connection: {e}");
return;
}
};

let quic_peer = match con.accept_bi().await {
Ok((tx, rx)) => Quic::new(tx, rx, con.remote_address()),
Err(e) => {
debug!("Failed to accept bidirectional quic stream: {e}");
return;
}
};

let tx_bytes = Arc::new(AtomicU64::new(0));
let rx_bytes = Arc::new(AtomicU64::new(0));
let new_peer = match Peer::new(
router_data_tx.clone(),
router_control_tx.clone(),
quic_peer,
dead_peer_sink.clone(),
tx_bytes.clone(),
rx_bytes.clone(),
) {
Ok(peer) => peer,
Err(e) => {
error!("Failed to spawn peer: {e}");
return;
}
};
info!("Accepted new inbound quic peer {}", con.remote_address());
self.add_peer(
Endpoint::new(Protocol::Quic, con.remote_address()),
PeerType::Inbound,
ConnectionTraffic { tx_bytes, rx_bytes },
Some(new_peer),
);
};

pending_quic_handshakes += 1;
quic_con_futures.push(tokio::time::timeout(INBOUND_QUIC_HANDSHAKE_TIMEOUT,con_future));
}
};
info!("Accepted new inbound quic peer {}", con.remote_address());
self.add_peer(
Endpoint::new(Protocol::Quic, con.remote_address()),
PeerType::Inbound,
ConnectionTraffic { tx_bytes, rx_bytes },
Some(new_peer),
)
}
}

info!("Shutting down closed quic listener");
}

/// Add a new peer identifier we discovered.
Expand Down

0 comments on commit 42f87b5

Please sign in to comment.