Skip to content

Commit

Permalink
feat(iroh-net)!: Improve initial connection latency (#2234)
Browse files Browse the repository at this point in the history
## Description

When the magic socket changes the underlying path the congestion
controller has all the wrong ideas about the latency. This results in
widely varying latencies and only slowly settling of them. This change
resets the values of the congestion controller whenever the magic socket
path changes. The result is a much faster stable connection.

Renames the Pong::src field to ping_observed_addr to try and make this
field a bit more intuitive to read. This helps especially around logging
pongs and the UDP address of the pong sender next to each other.

The mesh_stacks function in the tests has the relay server removed. The
point of the function is to hook up the magic endpoints using direct
addresses, so the relay is redundant. Furthermore the way the server was
set up in the tests made it not functional, so it never did anything. So
things are simplified without it.

## Breaking Changes

- `MagicEndpoint::accept` now returns `magic_endpoint::Accept` rather
than Quinn's `Accept` type.
- `magic_endpoint::Connecting` replaces `quinn::Connecting`. This is the
type returned by `.await`ing the `Accept` future.
- `magic_endpoint::accept_conn` and `magic_endpoint::get_alpn` have been
removed. You now accept the connection by directly awaiting the futures
returned. To retrieve the ALPN use the new `Connecting::alpn` method.

## Notes & open questions

To review, start at the new `rtt_actor.rs`, that's the important bit.
Then look at `MagicEndpoint` how it adds it to `.connect()` (simple) and
`.accept()` (complex). The latter requires copying a bunch of Quinn
structs into magic_endpoint. This also simplifies how the interact with
the alpn a little. Everything else is how making the rest of iroh adapt
to those changes.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.

## TODO

- ~~[ ] Use stream-util instead of futures-concurrency~~
  • Loading branch information
flub committed May 8, 2024
1 parent 26d718f commit ec48b0d
Show file tree
Hide file tree
Showing 23 changed files with 652 additions and 284 deletions.
342 changes: 210 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ hex = "0.4.3"
iroh-base = { version = "0.15.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.15.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.15.0", path = "../iroh-net", optional = true }
iroh-net = { version = "0.15.0", path = "../iroh-net" }
num_cpus = "1.15.0"
parking_lot = { version = "0.12.1", optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
quinn = { package = "iroh-quinn", version = "0.10" }
rand = "0.8"
range-collections = "0.4.0"
redb = { version = "2.0.0", optional = true }
Expand Down Expand Up @@ -67,7 +67,7 @@ futures-util = "0.3.30"

[features]
default = ["fs-store"]
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["dep:iroh-metrics"]
redb = ["dep:redb"]
Expand Down
11 changes: 10 additions & 1 deletion iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! To provide a collection (multiple blobs)
use anyhow::Result;
use tokio_util::task::LocalPoolHandle;
use tracing::warn;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{format::collection::Collection, Hash};
Expand Down Expand Up @@ -84,14 +85,22 @@ async fn main() -> Result<()> {
let lp = LocalPoolHandle::new(1);

let accept_task = tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
while let Some(incoming) = endpoint.accept().await {
println!("connection incoming");

let db = db.clone();
let lp = lp.clone();

// spawn a task to handle the connection
tokio::spawn(async move {
let remote_addr = incoming.remote_address();
let conn = match incoming.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
return;
}
};
iroh_blobs::provider::handle_connection(conn, db, MockEventSender, lp).await
});
}
Expand Down
12 changes: 3 additions & 9 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
};
use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter};
use iroh_net::magic_endpoint;
use serde::{Deserialize, Serialize};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, debug_span, info, trace, warn};
Expand Down Expand Up @@ -280,19 +281,12 @@ pub trait EventSender: Clone + Sync + Send + 'static {

/// Handle a single connection.
pub async fn handle_connection<D: Map, E: EventSender>(
connecting: quinn::Connecting,
connection: magic_endpoint::Connection,
db: D,
events: E,
rt: LocalPoolHandle,
) {
let remote_addr = connecting.remote_address();
let connection = match connecting.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
return;
}
};
let remote_addr = connection.remote_address();
let connection_id = connection.stable_id() as u64;
let span = debug_span!("connection", connection_id, %remote_addr);
async move {
Expand Down
4 changes: 2 additions & 2 deletions iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
portable-atomic = "1"
postcard = "1.0.8"
quic-rpc = { version = "0.8.0", features = ["flume-transport", "quinn-transport"] }
quinn = "0.10.2"
quic-rpc = { version = "0.9.0", features = ["flume-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.10.2"}
rand = "0.8.5"
rustyline = "12.0.0"
serde = { version = "1.0.197", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tempfile = { version = "3.4" }
iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" }
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
quinn = { version = "0.10", optional = true }
quinn = { package = "iroh-quinn", version = "0.10", optional = true }
futures-util = { version = "0.3.25", optional = true }
lru = "0.12"
self_cell = "1.0.3"
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub enum AcceptOutcome {
/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: quinn::Connecting,
connecting: iroh_net::magic_endpoint::Connecting,
accept_cb: F,
) -> Result<SyncFinished, AcceptError>
where
Expand Down
2 changes: 1 addition & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ iroh-base = { version = "0.15.0", path = "../iroh-base" }
# net dependencies (optional)
futures-lite = { version = "2.3", optional = true }
iroh-net = { path = "../iroh-net", version = "0.15.0", optional = true, default-features = false }
quinn = { version = "0.10", optional = true }
quinn = { package = "iroh-quinn", version = "0.10", optional = true }
tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] }
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }
Expand Down
10 changes: 7 additions & 3 deletions iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use iroh_gossip::{
};
use iroh_net::{
key::{PublicKey, SecretKey},
magic_endpoint::accept_conn,
relay::{RelayMap, RelayMode, RelayUrl},
MagicEndpoint, NodeAddr,
};
Expand Down Expand Up @@ -200,8 +199,13 @@ async fn endpoint_loop(endpoint: MagicEndpoint, gossip: Gossip) {
});
}
}
async fn handle_connection(conn: quinn::Connecting, gossip: Gossip) -> anyhow::Result<()> {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
async fn handle_connection(
mut conn: iroh_net::magic_endpoint::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
match alpn.as_bytes() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
Expand Down
8 changes: 5 additions & 3 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-concurrency = "7.6.0"
futures-lite = "2.3"
futures-sink = "0.3.25"
futures-util = "0.3.25"
Expand All @@ -44,11 +45,12 @@ libc = "0.2.139"
num_enum = "0.7"
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-project = "1"
pkarr = { version = "1.1.4", default-features = false, features = ["async", "relay"] }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
quinn-proto = "0.10.5"
quinn-udp = "0.4"
quinn = { package = "iroh-quinn", version = "0.10.4" }
quinn-proto = { package = "iroh-quinn-proto", version = "0.10.7" }
quinn-udp = { package = "iroh-quinn-udp", version = "0.4" }
rand = "0.8"
rand_core = "0.6.4"
rcgen = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
iroh-net = { path = ".." }
quinn = "0.10"
quinn = { package = "iroh-quinn", version = "0.10"}
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1"
Expand Down
7 changes: 4 additions & 3 deletions iroh-net/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ async fn main() -> anyhow::Result<()> {
);
// accept incoming connections, returns a normal QUIC connection

while let Some(conn) = endpoint.accept().await {
// accept the connection and extract the `node_id` and ALPN
let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?;
while let Some(mut conn) = endpoint.accept().await {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
info!(
"new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
Expand Down
7 changes: 4 additions & 3 deletions iroh-net/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ async fn main() -> anyhow::Result<()> {
"\tcargo run --example connect -- --node-id {me} --addrs \"{local_addrs}\" --relay-url {relay_url}\n"
);
// accept incoming connections, returns a normal QUIC connection
while let Some(conn) = endpoint.accept().await {
// accept the connection and extract the `node_id` and ALPN
let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?;
while let Some(mut conn) = endpoint.accept().await {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?;
info!(
"new connection from {node_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
Expand Down
15 changes: 10 additions & 5 deletions iroh-net/src/disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ pub struct Ping {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Pong {
pub tx_id: stun::TransactionId,
/// The observed address off the ping sender.
///
/// 18 bytes (16+2) on the wire; v4-mapped ipv6 for IPv4.
pub src: SendAddr,
pub ping_observed_addr: SendAddr,
}

/// Addresses to which we can send. This is either a UDP or a relay address.
Expand Down Expand Up @@ -280,15 +282,18 @@ impl Pong {
let tx_id = stun::TransactionId::from(tx_id);
let src = send_addr_from_bytes(&p[TX_LEN..])?;

Ok(Pong { tx_id, src })
Ok(Pong {
tx_id,
ping_observed_addr: src,
})
}

fn as_bytes(&self) -> Vec<u8> {
let header = msg_header(MessageType::Pong, V0);
let mut out = header.to_vec();
out.extend_from_slice(&self.tx_id);

let src_bytes = send_addr_to_vec(&self.src);
let src_bytes = send_addr_to_vec(&self.ping_observed_addr);
out.extend(src_bytes);
out
}
Expand Down Expand Up @@ -412,15 +417,15 @@ mod tests {
name: "pong",
m: Message::Pong(Pong{
tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(),
src: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()),
ping_observed_addr: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()),
}),
want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 00 00 00 00 00 00 00 00 00 00 ff ff 02 03 04 05 d2 04",
},
Test {
name: "pongv6",
m: Message::Pong(Pong {
tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(),
src: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()),
ping_observed_addr: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()),
}),
want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 fe d0 00 00 00 00 00 00 00 00 00 00 00 00 00 12 0a 1a",
},
Expand Down
Loading

0 comments on commit ec48b0d

Please sign in to comment.