From ec48b0d7eaef7f976f8d04e74629a4df07dcf39b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 May 2024 20:06:15 +0200 Subject: [PATCH] feat(iroh-net)!: Improve initial connection latency (#2234) ## Description When the magic socket changes the underlying path the congestion controller has all the wrong ideas about the latency. This results in widely varying latencies and only slowly settling of them. This change resets the values of the congestion controller whenever the magic socket path changes. The result is a much faster stable connection. Renames the Pong::src field to ping_observed_addr to try and make this field a bit more intuitive to read. This helps especially around logging pongs and the UDP address of the pong sender next to each other. The mesh_stacks function in the tests has the relay server removed. The point of the function is to hook up the magic endpoints using direct addresses, so the relay is redundant. Furthermore the way the server was set up in the tests made it not functional, so it never did anything. So things are simplified without it. ## Breaking Changes - `MagicEndpoint::accept` now returns `magic_endpoint::Accept` rather than Quinn's `Accept` type. - `magic_endpoint::Connecting` replaces `quinn::Connecting`. This is the type returned by `.await`ing the `Accept` future. - `magic_endpoint::accept_conn` and `magic_endpoint::get_alpn` have been removed. You now accept the connection by directly awaiting the futures returned. To retrieve the ALPN use the new `Connecting::alpn` method. ## Notes & open questions To review, start at the new `rtt_actor.rs`, that's the important bit. Then look at `MagicEndpoint` how it adds it to `.connect()` (simple) and `.accept()` (complex). The latter requires copying a bunch of Quinn structs into magic_endpoint. This also simplifies how the interact with the alpn a little. Everything else is how making the rest of iroh adapt to those changes. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. ## TODO - ~~[ ] Use stream-util instead of futures-concurrency~~ --- Cargo.lock | 342 +++++++++++------- iroh-blobs/Cargo.toml | 6 +- iroh-blobs/examples/provide-bytes.rs | 11 +- iroh-blobs/src/provider.rs | 12 +- iroh-cli/Cargo.toml | 4 +- iroh-docs/Cargo.toml | 2 +- iroh-docs/src/net.rs | 2 +- iroh-gossip/Cargo.toml | 2 +- iroh-gossip/examples/chat.rs | 10 +- iroh-net/Cargo.toml | 8 +- iroh-net/bench/Cargo.toml | 2 +- iroh-net/examples/listen-unreliable.rs | 7 +- iroh-net/examples/listen.rs | 7 +- iroh-net/src/disco.rs | 15 +- iroh-net/src/magic_endpoint.rs | 195 ++++++++-- iroh-net/src/magic_endpoint/rtt_actor.rs | 159 ++++++++ iroh-net/src/magicsock.rs | 104 +++--- iroh-net/src/magicsock/node_map/best_addr.rs | 4 +- iroh-net/src/magicsock/node_map/node_state.rs | 23 +- iroh/Cargo.toml | 4 +- iroh/src/docs_engine.rs | 5 +- iroh/src/docs_engine/live.rs | 4 +- iroh/src/node/builder.rs | 8 +- 23 files changed, 652 insertions(+), 284 deletions(-) create mode 100644 iroh-net/src/magic_endpoint/rtt_actor.rs diff --git a/Cargo.lock b/Cargo.lock index d767585b3a..c760c4a5d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,48 +78,47 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", - "is_terminal_polyfill", "utf8parse", ] [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -388,7 +387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" dependencies = [ "bytes", - "futures-lite", + "futures-lite 2.3.0", "genawaiter", "iroh-blake3", "iroh-io", @@ -412,9 +411,9 @@ checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "base64" -version = "0.22.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" [[package]] name = "base64-url" @@ -473,6 +472,18 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -548,9 +559,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.96" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd" +checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" [[package]] name = "cfg-if" @@ -681,9 +692,9 @@ checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "colored" @@ -709,9 +720,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.5.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ "crossbeam-utils", ] @@ -1030,7 +1041,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.5", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core", @@ -1038,9 +1049,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.6.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "der" @@ -1443,9 +1454,18 @@ checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fastrand" -version = "2.1.0" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + +[[package]] +name = "fastrand" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" [[package]] name = "fd-lock" @@ -1511,6 +1531,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.30" @@ -1547,6 +1573,20 @@ dependencies = [ "futures-sink", ] +[[package]] +name = "futures-concurrency" +version = "7.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee14e256b9143bfafbf2fddeede6f396650bacf95d06fc1b3f2b503df129a0" +dependencies = [ + "bitvec", + "futures-core", + "futures-lite 1.13.0", + "pin-project", + "slab", + "smallvec", +] + [[package]] name = "futures-core" version = "0.3.30" @@ -1570,13 +1610,28 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-lite" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand", + "fastrand 2.0.2", "futures-core", "futures-io", "parking", @@ -1793,9 +1848,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.5" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ "ahash", "allocator-api2", @@ -1807,7 +1862,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692eaaf7f7607518dd3cef090f1474b61edc5301d8012f09579920df68b725ee" dependencies = [ - "hashbrown 0.14.5", + "hashbrown 0.14.3", ] [[package]] @@ -2233,7 +2288,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.5", + "hashbrown 0.14.3", "serde", ] @@ -2307,7 +2362,7 @@ dependencies = [ "derive_more", "flume", "futures-buffered", - "futures-lite", + "futures-lite 2.3.0", "futures-util", "genawaiter", "hex", @@ -2320,6 +2375,7 @@ dependencies = [ "iroh-io", "iroh-metrics", "iroh-net", + "iroh-quinn", "iroh-test", "num_cpus", "parking_lot", @@ -2327,7 +2383,6 @@ dependencies = [ "postcard", "proptest", "quic-rpc", - "quinn", "rand", "rand_chacha", "regex", @@ -2399,7 +2454,7 @@ dependencies = [ "derive_more", "flume", "futures-buffered", - "futures-lite", + "futures-lite 2.3.0", "futures-util", "genawaiter", "hashlink", @@ -2410,12 +2465,12 @@ dependencies = [ "iroh-io", "iroh-metrics", "iroh-net", + "iroh-quinn", "iroh-test", "num_cpus", "parking_lot", "postcard", "proptest", - "quinn", "rand", "range-collections", "rcgen 0.12.1", @@ -2454,19 +2509,19 @@ dependencies = [ "duct", "flume", "futures-buffered", - "futures-lite", + "futures-lite 2.3.0", "hex", "human-time", "indicatif", "iroh", "iroh-metrics", + "iroh-quinn", "nix 0.27.1", "parking_lot", "pkarr", "portable-atomic", "postcard", "quic-rpc", - "quinn", "rand", "regex", "rustyline", @@ -2500,7 +2555,7 @@ dependencies = [ "clap", "derive_more", "dirs-next", - "futures-lite", + "futures-lite 2.3.0", "governor", "hickory-proto", "hickory-resolver", @@ -2551,12 +2606,12 @@ dependencies = [ "iroh-blake3", "iroh-metrics", "iroh-net", + "iroh-quinn", "iroh-test", "lru", "num_enum", "postcard", "proptest", - "quinn", "rand", "rand_chacha", "rand_core", @@ -2583,16 +2638,16 @@ dependencies = [ "clap", "derive_more", "ed25519-dalek", - "futures-lite", + "futures-lite 2.3.0", "genawaiter", "indexmap 2.2.6", "iroh-base", "iroh-blake3", "iroh-metrics", "iroh-net", + "iroh-quinn", "iroh-test", "postcard", - "quinn", "rand", "rand_chacha", "rand_core", @@ -2611,7 +2666,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d1047ad5ca29ab4ff316b6830d86e7ea52cea54325e4d4a849692e1274b498" dependencies = [ "bytes", - "futures-lite", + "futures-lite 2.3.0", "pin-project", "smallvec", "tokio", @@ -2653,7 +2708,8 @@ dependencies = [ "duct", "flume", "futures-buffered", - "futures-lite", + "futures-concurrency", + "futures-lite 2.3.0", "futures-sink", "futures-util", "governor", @@ -2668,6 +2724,9 @@ dependencies = [ "igd-next", "iroh-base", "iroh-metrics", + "iroh-quinn", + "iroh-quinn-proto", + "iroh-quinn-udp", "iroh-test", "libc", "netdev", @@ -2678,13 +2737,11 @@ dependencies = [ "num_enum", "once_cell", "parking_lot", + "pin-project", "pkarr", "postcard", "pretty_assertions", "proptest", - "quinn", - "quinn-proto", - "quinn-udp", "rand", "rand_chacha", "rand_core", @@ -2732,12 +2789,60 @@ dependencies = [ "clap", "hdrhistogram", "iroh-net", - "quinn", + "iroh-quinn", "tokio", "tracing", "tracing-subscriber", ] +[[package]] +name = "iroh-quinn" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b934380145fd5d53a583d01ae9500f4807efe6b0f0fe115c7be4afa2b35db99f" +dependencies = [ + "bytes", + "iroh-quinn-proto", + "iroh-quinn-udp", + "pin-project-lite", + "rustc-hash", + "rustls", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "iroh-quinn-proto" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f2656b322c7f6cf3eb95e632d1c0f2fa546841915b0270da581f918c70c4be" +dependencies = [ + "bytes", + "rand", + "ring 0.16.20", + "rustc-hash", + "rustls", + "rustls-native-certs", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "iroh-quinn-udp" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6679979a7271c24f9dae9622c0b4a543881508aa3a7396f55dfbaaa56f01c063" +dependencies = [ + "bytes", + "libc", + "socket2", + "tracing", + "windows-sys 0.48.0", +] + [[package]] name = "iroh-test" version = "0.15.0" @@ -2759,12 +2864,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "is_terminal_polyfill" -version = "1.70.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" - [[package]] name = "itertools" version = "0.10.5" @@ -2800,9 +2899,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.154" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -2854,7 +2953,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" dependencies = [ - "hashbrown 0.14.5", + "hashbrown 0.14.3", ] [[package]] @@ -3427,7 +3526,7 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" dependencies = [ - "base64 0.22.1", + "base64 0.22.0", "serde", ] @@ -3865,18 +3964,18 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7af964f8ee0a3ff7f03b8bc3ffce8cd604d9c2e5805b45d1446f774c71fb07e" +checksum = "d6f365b77f07281f6dfc300eb5a9913185fa686e9119c787200ace337a53bdbf" dependencies = [ "bincode", "educe", "flume", - "futures-lite", + "futures-lite 2.3.0", "futures-sink", "futures-util", + "iroh-quinn", "pin-project", - "quinn", "serde", "tokio", "tokio-serde", @@ -3890,54 +3989,6 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quinn" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" -dependencies = [ - "bytes", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash", - "rustls", - "thiserror", - "tokio", - "tracing", -] - -[[package]] -name = "quinn-proto" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a" -dependencies = [ - "bytes", - "rand", - "ring 0.16.20", - "rustc-hash", - "rustls", - "rustls-native-certs", - "slab", - "thiserror", - "tinyvec", - "tracing", -] - -[[package]] -name = "quinn-udp" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" -dependencies = [ - "bytes", - "libc", - "socket2", - "tracing", - "windows-sys 0.48.0", -] - [[package]] name = "quote" version = "1.0.36" @@ -3957,6 +4008,12 @@ dependencies = [ "pest_derive", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "radix_trie" version = "0.2.1" @@ -4020,9 +4077,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "11.0.2" +version = "11.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd" +checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" dependencies = [ "bitflags 2.5.0", ] @@ -4374,9 +4431,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring 0.17.8", @@ -4411,7 +4468,7 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" dependencies = [ - "base64 0.22.1", + "base64 0.22.0", "rustls-pki-types", ] @@ -4575,9 +4632,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.200" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" +checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" dependencies = [ "serde_derive", ] @@ -4612,9 +4669,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.200" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" +checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", @@ -4674,11 +4731,11 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.8.1" +version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" +checksum = "2c85f8e96d1d6857f13768fcbd895fcb06225510022a2774ed8b5150581847b0" dependencies = [ - "base64 0.22.1", + "base64 0.22.0", "chrono", "hex", "indexmap 1.9.3", @@ -4692,9 +4749,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.8.1" +version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" +checksum = "c8b3a576c4eb2924262d5951a3b737ccaf16c931e39a2810c36f9a7e25575557" dependencies = [ "darling", "proc-macro2", @@ -4822,9 +4879,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.7" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", "windows-sys 0.52.0", @@ -5032,7 +5089,7 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0adebf9fb8fba5c39ee34092b0383f247e4d1255b98fcffec94b4b797b85b677" dependencies = [ - "base64 0.22.1", + "base64 0.22.0", "bounded-integer", "byteorder", "crc", @@ -5196,6 +5253,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.10.1" @@ -5203,7 +5266,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.2", "rustix", "windows-sys 0.52.0", ] @@ -5428,7 +5491,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "hashbrown 0.14.5", + "hashbrown 0.14.3", "pin-project-lite", "slab", "tokio", @@ -5478,7 +5541,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.7", + "winnow 0.6.6", ] [[package]] @@ -5698,9 +5761,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "unicode-xid" @@ -5769,6 +5832,12 @@ dependencies = [ "libc", ] +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.5.0" @@ -6227,9 +6296,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.7" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b9415ee827af173ebb3f15f9083df5a122eb93572ec28741fb153356ea2578" +checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" dependencies = [ "memchr", ] @@ -6258,6 +6327,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "x509-parser" version = "0.15.1" diff --git a/iroh-blobs/Cargo.toml b/iroh-blobs/Cargo.toml index d0dba588ce..aa4fba33c8 100644 --- a/iroh-blobs/Cargo.toml +++ b/iroh-blobs/Cargo.toml @@ -30,11 +30,11 @@ hex = "0.4.3" iroh-base = { version = "0.15.0", features = ["redb"], path = "../iroh-base" } iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.15.0", path = "../iroh-metrics", optional = true } -iroh-net = { version = "0.15.0", path = "../iroh-net", optional = true } +iroh-net = { version = "0.15.0", path = "../iroh-net" } num_cpus = "1.15.0" parking_lot = { version = "0.12.1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quinn = "0.10" +quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" range-collections = "0.4.0" redb = { version = "2.0.0", optional = true } @@ -67,7 +67,7 @@ futures-util = "0.3.30" [features] default = ["fs-store"] -downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"] +downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] metrics = ["dep:iroh-metrics"] redb = ["dep:redb"] diff --git a/iroh-blobs/examples/provide-bytes.rs b/iroh-blobs/examples/provide-bytes.rs index c355e38a91..8668f8fe89 100644 --- a/iroh-blobs/examples/provide-bytes.rs +++ b/iroh-blobs/examples/provide-bytes.rs @@ -11,6 +11,7 @@ //! To provide a collection (multiple blobs) use anyhow::Result; use tokio_util::task::LocalPoolHandle; +use tracing::warn; use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_blobs::{format::collection::Collection, Hash}; @@ -84,7 +85,7 @@ async fn main() -> Result<()> { let lp = LocalPoolHandle::new(1); let accept_task = tokio::spawn(async move { - while let Some(conn) = endpoint.accept().await { + while let Some(incoming) = endpoint.accept().await { println!("connection incoming"); let db = db.clone(); @@ -92,6 +93,14 @@ async fn main() -> Result<()> { // spawn a task to handle the connection tokio::spawn(async move { + let remote_addr = incoming.remote_address(); + let conn = match incoming.await { + Ok(conn) => conn, + Err(err) => { + warn!(%remote_addr, "Error connecting: {err:#}"); + return; + } + }; iroh_blobs::provider::handle_connection(conn, db, MockEventSender, lp).await }); } diff --git a/iroh-blobs/src/provider.rs b/iroh-blobs/src/provider.rs index 52371cba3a..bdba0315d6 100644 --- a/iroh-blobs/src/provider.rs +++ b/iroh-blobs/src/provider.rs @@ -11,6 +11,7 @@ use iroh_io::stats::{ SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter, }; use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter}; +use iroh_net::magic_endpoint; use serde::{Deserialize, Serialize}; use tokio_util::task::LocalPoolHandle; use tracing::{debug, debug_span, info, trace, warn}; @@ -280,19 +281,12 @@ pub trait EventSender: Clone + Sync + Send + 'static { /// Handle a single connection. pub async fn handle_connection( - connecting: quinn::Connecting, + connection: magic_endpoint::Connection, db: D, events: E, rt: LocalPoolHandle, ) { - let remote_addr = connecting.remote_address(); - let connection = match connecting.await { - Ok(conn) => conn, - Err(err) => { - warn!(%remote_addr, "Error connecting: {err:#}"); - return; - } - }; + let remote_addr = connection.remote_address(); let connection_id = connection.stable_id() as u64; let span = debug_span!("connection", connection_id, %remote_addr); async move { diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index bb6eaec942..9cec697935 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -44,8 +44,8 @@ parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.8.0", features = ["flume-transport", "quinn-transport"] } -quinn = "0.10.2" +quic-rpc = { version = "0.9.0", features = ["flume-transport", "quinn-transport"] } +quinn = { package = "iroh-quinn", version = "0.10.2"} rand = "0.8.5" rustyline = "12.0.0" serde = { version = "1.0.197", features = ["derive"] } diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index aa9b9839f8..fb517fb62e 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -43,7 +43,7 @@ tempfile = { version = "3.4" } iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" } tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} -quinn = { version = "0.10", optional = true } +quinn = { package = "iroh-quinn", version = "0.10", optional = true } futures-util = { version = "0.3.25", optional = true } lru = "0.12" self_cell = "1.0.3" diff --git a/iroh-docs/src/net.rs b/iroh-docs/src/net.rs index aff9046548..a71b0921af 100644 --- a/iroh-docs/src/net.rs +++ b/iroh-docs/src/net.rs @@ -106,7 +106,7 @@ pub enum AcceptOutcome { /// Handle an iroh-docs connection and sync all shared documents in the replica store. pub async fn handle_connection( sync: SyncHandle, - connecting: quinn::Connecting, + connecting: iroh_net::magic_endpoint::Connecting, accept_cb: F, ) -> Result where diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 97d55de4a8..172a6e5e74 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -33,7 +33,7 @@ iroh-base = { version = "0.15.0", path = "../iroh-base" } # net dependencies (optional) futures-lite = { version = "2.3", optional = true } iroh-net = { path = "../iroh-net", version = "0.15.0", optional = true, default-features = false } -quinn = { version = "0.10", optional = true } +quinn = { package = "iroh-quinn", version = "0.10", optional = true } tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] } tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index 6a596b1bec..4a8eecbc4e 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -11,7 +11,6 @@ use iroh_gossip::{ }; use iroh_net::{ key::{PublicKey, SecretKey}, - magic_endpoint::accept_conn, relay::{RelayMap, RelayMode, RelayUrl}, MagicEndpoint, NodeAddr, }; @@ -200,8 +199,13 @@ async fn endpoint_loop(endpoint: MagicEndpoint, gossip: Gossip) { }); } } -async fn handle_connection(conn: quinn::Connecting, gossip: Gossip) -> anyhow::Result<()> { - let (peer_id, alpn, conn) = accept_conn(conn).await?; +async fn handle_connection( + mut conn: iroh_net::magic_endpoint::Connecting, + gossip: Gossip, +) -> anyhow::Result<()> { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let peer_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; match alpn.as_bytes() { GOSSIP_ALPN => gossip .handle_connection(conn) diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 4e922c12f5..f5ccbcf1eb 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -26,6 +26,7 @@ der = { version = "0.7", features = ["alloc", "derive"] } derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] } flume = "0.11" futures-buffered = "0.2.4" +futures-concurrency = "7.6.0" futures-lite = "2.3" futures-sink = "0.3.25" futures-util = "0.3.25" @@ -44,11 +45,12 @@ libc = "0.2.139" num_enum = "0.7" once_cell = "1.18.0" parking_lot = "0.12.1" +pin-project = "1" pkarr = { version = "1.1.4", default-features = false, features = ["async", "relay"] } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quinn = "0.10" -quinn-proto = "0.10.5" -quinn-udp = "0.4" +quinn = { package = "iroh-quinn", version = "0.10.4" } +quinn-proto = { package = "iroh-quinn-proto", version = "0.10.7" } +quinn-udp = { package = "iroh-quinn-udp", version = "0.4" } rand = "0.8" rand_core = "0.6.4" rcgen = "0.11" diff --git a/iroh-net/bench/Cargo.toml b/iroh-net/bench/Cargo.toml index 2c7ac57914..1415a6cbe3 100644 --- a/iroh-net/bench/Cargo.toml +++ b/iroh-net/bench/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1.0.22" bytes = "1" hdrhistogram = { version = "7.2", default-features = false } iroh-net = { path = ".." } -quinn = "0.10" +quinn = { package = "iroh-quinn", version = "0.10"} clap = { version = "4", features = ["derive"] } tokio = { version = "1.0.1", features = ["rt", "sync"] } tracing = "0.1" diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs index ce6467cd8a..58a494d2dc 100644 --- a/iroh-net/examples/listen-unreliable.rs +++ b/iroh-net/examples/listen-unreliable.rs @@ -63,9 +63,10 @@ async fn main() -> anyhow::Result<()> { ); // accept incoming connections, returns a normal QUIC connection - while let Some(conn) = endpoint.accept().await { - // accept the connection and extract the `node_id` and ALPN - let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + while let Some(mut conn) = endpoint.accept().await { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; info!( "new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})", conn.remote_address() diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs index a99a3b1f65..ccee87df8a 100644 --- a/iroh-net/examples/listen.rs +++ b/iroh-net/examples/listen.rs @@ -62,9 +62,10 @@ async fn main() -> anyhow::Result<()> { "\tcargo run --example connect -- --node-id {me} --addrs \"{local_addrs}\" --relay-url {relay_url}\n" ); // accept incoming connections, returns a normal QUIC connection - while let Some(conn) = endpoint.accept().await { - // accept the connection and extract the `node_id` and ALPN - let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + while let Some(mut conn) = endpoint.accept().await { + let alpn = conn.alpn().await?; + let conn = conn.await?; + let node_id = iroh_net::magic_endpoint::get_remote_node_id(&conn)?; info!( "new connection from {node_id} with ALPN {alpn} (coming from {})", conn.remote_address() diff --git a/iroh-net/src/disco.rs b/iroh-net/src/disco.rs index 668533643a..577c844135 100644 --- a/iroh-net/src/disco.rs +++ b/iroh-net/src/disco.rs @@ -129,8 +129,10 @@ pub struct Ping { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Pong { pub tx_id: stun::TransactionId, + /// The observed address off the ping sender. + /// /// 18 bytes (16+2) on the wire; v4-mapped ipv6 for IPv4. - pub src: SendAddr, + pub ping_observed_addr: SendAddr, } /// Addresses to which we can send. This is either a UDP or a relay address. @@ -280,7 +282,10 @@ impl Pong { let tx_id = stun::TransactionId::from(tx_id); let src = send_addr_from_bytes(&p[TX_LEN..])?; - Ok(Pong { tx_id, src }) + Ok(Pong { + tx_id, + ping_observed_addr: src, + }) } fn as_bytes(&self) -> Vec { @@ -288,7 +293,7 @@ impl Pong { let mut out = header.to_vec(); out.extend_from_slice(&self.tx_id); - let src_bytes = send_addr_to_vec(&self.src); + let src_bytes = send_addr_to_vec(&self.ping_observed_addr); out.extend(src_bytes); out } @@ -412,7 +417,7 @@ mod tests { name: "pong", m: Message::Pong(Pong{ tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(), - src: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()), + ping_observed_addr: SendAddr::Udp("2.3.4.5:1234".parse().unwrap()), }), want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 00 00 00 00 00 00 00 00 00 00 ff ff 02 03 04 05 d2 04", }, @@ -420,7 +425,7 @@ mod tests { name: "pongv6", m: Message::Pong(Pong { tx_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12].into(), - src: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()), + ping_observed_addr: SendAddr::Udp("[fed0::12]:6666".parse().unwrap()), }), want: "02 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 00 fe d0 00 00 00 00 00 00 00 00 00 00 00 00 00 12 0a 1a", }, diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index d0ff7f7d1c..ba4f760672 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -2,14 +2,21 @@ //! conenctions or a relay when necessary, optimizing the path to target nodes to ensure maximum //! connectivity. -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::any::Any; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; use futures_lite::StreamExt; -use quinn_proto::VarInt; +use quinn::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; -use tracing::{debug, trace}; +use tracing::{debug, info_span, trace, warn}; use crate::{ config, @@ -22,6 +29,12 @@ use crate::{ tls, NodeId, }; +mod rtt_actor; + +use self::rtt_actor::RttMessage; + +pub use quinn::Connection; + pub use super::magicsock::{ ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, LocalEndpointsStream, @@ -246,6 +259,7 @@ pub struct MagicEndpoint { secret_key: Arc, msock: Handle, endpoint: quinn::Endpoint, + rtt_actor: Arc, keylog: bool, cancel_token: CancellationToken, } @@ -266,6 +280,8 @@ impl MagicEndpoint { keylog: bool, ) -> Result { let secret_key = msock_opts.secret_key.clone(); + let span = info_span!("magic_ep", me = %secret_key.public().fmt_short()); + let _guard = span.enter(); let msock = magicsock::MagicSock::spawn(msock_opts).await?; trace!("created magicsock"); @@ -289,14 +305,18 @@ impl MagicEndpoint { secret_key: Arc::new(secret_key), msock, endpoint, + rtt_actor: Arc::new(rtt_actor::RttHandle::new()), keylog, cancel_token: CancellationToken::new(), }) } /// Accept an incoming connection on the socket. - pub fn accept(&self) -> quinn::Accept<'_> { - self.endpoint.accept() + pub fn accept(&self) -> Accept<'_> { + Accept { + inner: self.endpoint.accept(), + magic_ep: self.clone(), + } } /// Get the node id of this endpoint. @@ -506,7 +526,19 @@ impl MagicEndpoint { .endpoint .connect_with(client_config, addr, "localhost")?; - connect.await.context("failed connecting to provider") + let connection = connect.await.context("failed connecting to provider")?; + + let rtt_msg = RttMessage::NewConnection { + connection: connection.weak_handle(), + conn_type_changes: self.conn_type_stream(node_id)?, + node_id: *node_id, + }; + if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await { + // If this actor is dead, that's not great but we can still function. + warn!("rtt-actor not reachable: {err:#}"); + } + + Ok(connection) } /// Return the quic mapped address for this `node_id` and possibly start discovery @@ -649,25 +681,98 @@ impl MagicEndpoint { } } -/// Accept an incoming connection and extract the client-provided [`PublicKey`] and ALPN protocol. -pub async fn accept_conn( - mut conn: quinn::Connecting, -) -> Result<(PublicKey, String, quinn::Connection)> { - let alpn = get_alpn(&mut conn).await?; - let conn = conn.await?; - let peer_id = get_remote_node_id(&conn)?; - Ok((peer_id, alpn, conn)) +/// Future produced by [`MagicEndpoint::accept`]. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Accept<'a> { + #[pin] + inner: quinn::Accept<'a>, + magic_ep: MagicEndpoint, } -/// Extract the ALPN protocol from the peer's TLS certificate. -pub async fn get_alpn(connecting: &mut quinn::Connecting) -> Result { - let data = connecting.handshake_data().await?; - match data.downcast::() { - Ok(data) => match data.protocol { - Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), - None => bail!("no ALPN protocol available"), - }, - Err(_) => bail!("unknown handshake type"), +impl<'a> Future for Accept<'a> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(inner)) => Poll::Ready(Some(Connecting { + inner, + magic_ep: this.magic_ep.clone(), + })), + } + } +} + +/// In-progress connection attempt future +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Connecting { + #[pin] + inner: quinn::Connecting, + magic_ep: MagicEndpoint, +} + +impl Connecting { + /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security. + pub fn into_0rtt(self) -> Result<(quinn::Connection, quinn::ZeroRttAccepted), Self> { + match self.inner.into_0rtt() { + Ok((conn, zrtt_accepted)) => { + try_send_rtt_msg(&conn, &self.magic_ep); + Ok((conn, zrtt_accepted)) + } + Err(inner) => Err(Self { + inner, + magic_ep: self.magic_ep, + }), + } + } + + /// Parameters negotiated during the handshake + pub async fn handshake_data(&mut self) -> Result, quinn::ConnectionError> { + self.inner.handshake_data().await + } + + /// The local IP address which was used when the peer established the connection. + pub fn local_ip(&self) -> Option { + self.inner.local_ip() + } + + /// The peer's UDP address. + pub fn remote_address(&self) -> SocketAddr { + self.inner.remote_address() + } + + /// Extracts the ALPN protocol from the peer's handshake data. + // Note, we could totally provide this method to be on a Connection as well. But we'd + // need to wrap Connection too. + pub async fn alpn(&mut self) -> Result { + let data = self.handshake_data().await?; + match data.downcast::() { + Ok(data) => match data.protocol { + Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), + None => bail!("no ALPN protocol available"), + }, + Err(_) => bail!("unknown handshake type"), + } + } +} + +impl Future for Connecting { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(conn)) => { + try_send_rtt_msg(&conn, this.magic_ep); + Poll::Ready(Ok(conn)) + } + } } } @@ -692,6 +797,30 @@ pub fn get_remote_node_id(connection: &quinn::Connection) -> Result { } } +/// Try send a message to the rtt-actor. +/// +/// If we can't notify the actor that will impact performance a little, but we can still +/// function. +fn try_send_rtt_msg(conn: &quinn::Connection, magic_ep: &MagicEndpoint) { + // If we can't notify the rtt-actor that's not great but not critical. + let Ok(peer_id) = get_remote_node_id(conn) else { + warn!(?conn, "failed to get remote node id"); + return; + }; + let Ok(conn_type_changes) = magic_ep.conn_type_stream(&peer_id) else { + warn!(?conn, "failed to create conn_type_stream"); + return; + }; + let rtt_msg = RttMessage::NewConnection { + connection: conn.weak_handle(), + conn_type_changes, + node_id: peer_id, + }; + if let Err(err) = magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) { + warn!(?conn, "rtt-actor not reachable: {err:#}"); + } +} + // TODO: These tests could still be flaky, lets fix that: // https://github.com/n0-computer/iroh/issues/1183 #[cfg(test)] @@ -763,8 +892,8 @@ mod tests { .await .unwrap(); info!("accepting connection"); - let conn = ep.accept().await.unwrap(); - let (_peer_id, _alpn, conn) = accept_conn(conn).await.unwrap(); + let incoming = ep.accept().await.unwrap(); + let conn = incoming.await.unwrap(); let mut stream = conn.accept_uni().await.unwrap(); let mut buf = [0u8, 5]; stream.read_exact(&mut buf).await.unwrap(); @@ -916,7 +1045,8 @@ mod tests { let now = Instant::now(); println!("[server] round {}", i + 1); let incoming = ep.accept().await.unwrap(); - let (peer_id, _alpn, conn) = accept_conn(incoming).await.unwrap(); + let conn = incoming.await.unwrap(); + let peer_id = get_remote_node_id(&conn).unwrap(); info!(%i, peer = %peer_id.fmt_short(), "accepted connection"); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); let mut buf = vec![0u8; chunk_size]; @@ -1023,8 +1153,10 @@ mod tests { } async fn accept_world(ep: MagicEndpoint, src: NodeId) { - let incoming = ep.accept().await.unwrap(); - let (node_id, alpn, conn) = accept_conn(incoming).await.unwrap(); + let mut incoming = ep.accept().await.unwrap(); + let alpn = incoming.alpn().await.unwrap(); + let conn = incoming.await.unwrap(); + let node_id = get_remote_node_id(&conn).unwrap(); assert_eq!(node_id, src); assert_eq!(alpn.as_bytes(), TEST_ALPN); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); @@ -1118,9 +1250,10 @@ mod tests { let _ep2_guard = CallOnDrop::new(move || { ep2_abort_handle.abort(); }); - async fn accept(ep: MagicEndpoint) -> (PublicKey, String, quinn::Connection) { + async fn accept(ep: MagicEndpoint) -> NodeId { let incoming = ep.accept().await.unwrap(); - accept_conn(incoming).await.unwrap() + let conn = incoming.await.unwrap(); + get_remote_node_id(&conn).unwrap() } // create a node addr with no direct connections @@ -1134,7 +1267,7 @@ mod tests { let _conn_2 = ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap(); - let (got_id, _, _conn) = accept_res.await.unwrap(); + let got_id = accept_res.await.unwrap(); assert_eq!(ep2_nodeid, got_id); res_ep1.await.unwrap().unwrap(); diff --git a/iroh-net/src/magic_endpoint/rtt_actor.rs b/iroh-net/src/magic_endpoint/rtt_actor.rs new file mode 100644 index 0000000000..2403e95ddf --- /dev/null +++ b/iroh-net/src/magic_endpoint/rtt_actor.rs @@ -0,0 +1,159 @@ +//! Actor which coordinates the congestion controller for the magic socket + +use std::collections::HashMap; + +use futures_concurrency::stream::stream_group; +use futures_lite::StreamExt; +use iroh_base::key::NodeId; +use tokio::sync::{mpsc, Notify}; +use tokio::task::JoinHandle; +use tokio::time::Duration; +use tracing::{debug, error, info_span, trace, warn, Instrument}; + +use crate::magicsock::{ConnectionType, ConnectionTypeStream}; + +#[derive(Debug)] +pub(super) struct RttHandle { + // We should and some point use this to propagate panics and errors. + pub(super) _handle: JoinHandle<()>, + pub(super) msg_tx: mpsc::Sender, +} + +impl RttHandle { + pub(super) fn new() -> Self { + let mut actor = RttActor { + connection_events: stream_group::StreamGroup::new().keyed(), + connections: HashMap::new(), + tick: Notify::new(), + }; + let (msg_tx, msg_rx) = mpsc::channel(16); + let _handle = tokio::spawn( + async move { + actor.run(msg_rx).await; + } + .instrument(info_span!("rtt-actor")), + ); + Self { _handle, msg_tx } + } +} + +/// Messages to send to the [`RttActor`]. +#[derive(Debug)] +pub(super) enum RttMessage { + /// Informs the [`RttActor`] of a new connection is should monitor. + NewConnection { + /// The connection. + connection: quinn::WeakConnectionHandle, + /// Path changes for this connection from the magic socket. + conn_type_changes: ConnectionTypeStream, + /// For reporting-only, the Node ID of this connection. + node_id: NodeId, + }, +} + +/// Actor to coordinate congestion controller state with magic socket state. +/// +/// The magic socket can change the underlying network path, between two nodes. If we can +/// inform the QUIC congestion controller of this event it will work much more efficiently. +#[derive(Debug)] +struct RttActor { + /// Stream of connection type changes. + connection_events: stream_group::Keyed, + /// References to the connections. + /// + /// These are weak references so not to keep the connections alive. The key allows + /// removing the corresponding stream from `conn_type_changes`. + connections: HashMap, + /// A way to notify the main actor loop to run over. + /// + /// E.g. when a new stream was added. + tick: Notify, +} + +impl RttActor { + /// Runs the actor main loop. + /// + /// The main loop will finish when the sender is dropped. + async fn run(&mut self, mut msg_rx: mpsc::Receiver) { + let mut cleanup_interval = tokio::time::interval(Duration::from_secs(5)); + cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + Some(msg) = msg_rx.recv() => self.handle_msg(msg), + item = self.connection_events.next(), + if !self.connection_events.is_empty() => self.do_reset_rtt(item), + _ = cleanup_interval.tick() => self.do_connections_cleanup(), + () = self.tick.notified() => continue, + else => break, + } + } + debug!("rtt-actor finished"); + } + + /// Handle actor messages. + fn handle_msg(&mut self, msg: RttMessage) { + match msg { + RttMessage::NewConnection { + connection, + conn_type_changes, + node_id, + } => { + self.handle_new_connection(connection, conn_type_changes, node_id); + } + } + } + + /// Handles the new connection message. + fn handle_new_connection( + &mut self, + connection: quinn::WeakConnectionHandle, + conn_type_changes: ConnectionTypeStream, + node_id: NodeId, + ) { + let key = self.connection_events.insert(conn_type_changes); + self.connections.insert(key, (connection, node_id)); + self.tick.notify_one(); + } + + /// Performs the congestion controller reset for a magic socket path change. + /// + /// Regardless of which kind of path we are changed to, the congestion controller needs + /// resetting. Even when switching to mixed we should reset the state as e.g. switching + /// from direct to mixed back to direct should be a rare exception and is a bug if this + /// happens commonly. + fn do_reset_rtt(&mut self, item: Option<(stream_group::Key, ConnectionType)>) { + match item { + Some((key, new_conn_type)) => match self.connections.get(&key) { + Some((handle, node_id)) => { + if handle.reset_congestion_state() { + debug!( + node_id = %node_id.fmt_short(), + new_type = ?new_conn_type, + "Congestion controller state reset", + ); + } else { + debug!( + node_id = %node_id.fmt_short(), + "removing dropped connection", + ); + self.connection_events.remove(key); + } + } + None => error!("No connection found for stream item"), + }, + None => { + warn!("self.conn_type_changes is empty but was polled"); + } + } + } + + /// Performs cleanup for closed connection. + fn do_connections_cleanup(&mut self) { + for (key, (handle, node_id)) in self.connections.iter() { + if !handle.is_alive() { + trace!(node_id = %node_id.fmt_short(), "removing stale connection"); + self.connection_events.remove(*key); + } + } + } +} diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index cb62c8db53..5795f5ac22 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -842,7 +842,7 @@ impl MagicSock { "sending pong"); let pong = disco::Message::Pong(disco::Pong { tx_id: dm.tx_id, - src: addr.clone(), + ping_observed_addr: addr.clone(), }); if !self.send_disco_message_queued(addr.clone(), *sender, pong) { @@ -2555,7 +2555,7 @@ pub(crate) mod tests { use iroh_test::CallOnDrop; use rand::RngCore; - use crate::{relay::RelayMode, test_utils::run_relay_server, tls, MagicEndpoint}; + use crate::{relay::RelayMode, tls, MagicEndpoint}; use super::*; @@ -2569,7 +2569,7 @@ pub(crate) mod tests { const ALPN: &[u8] = b"n0/test/1"; impl MagicStack { - async fn new(relay_map: RelayMap) -> Result { + async fn new(relay_mode: RelayMode) -> Result { let secret_key = SecretKey::generate(); let mut transport_config = quinn::TransportConfig::default(); @@ -2578,7 +2578,7 @@ pub(crate) mod tests { let endpoint = MagicEndpoint::builder() .secret_key(secret_key.clone()) .transport_config(transport_config) - .relay_mode(RelayMode::Custom(relay_map)) + .relay_mode(relay_mode) .alpns(vec![ALPN.to_vec()]) .bind(0) .await?; @@ -2605,21 +2605,17 @@ pub(crate) mod tests { /// Monitors endpoint changes and plumbs things together. /// - /// Whenever the local endpoints of a magic endpoint change this address is added to the - /// other magic sockets. This function will await until the endpoints are connected the - /// first time before returning. + /// This is a way of connecting endpoints without a relay server. Whenever the local + /// endpoints of a magic endpoint change this address is added to the other magic + /// sockets. This function will await until the endpoints are connected the first time + /// before returning. /// /// When the returned drop guard is dropped, the tasks doing this updating are stopped. - async fn mesh_stacks(stacks: Vec, relay_url: RelayUrl) -> Result { + #[instrument(skip_all)] + async fn mesh_stacks(stacks: Vec) -> Result { /// Registers endpoint addresses of a node to all other nodes. - fn update_eps( - stacks: &[MagicStack], - my_idx: usize, - new_eps: Vec, - relay_url: RelayUrl, - ) { + fn update_eps(stacks: &[MagicStack], my_idx: usize, new_eps: Vec) { let me = &stacks[my_idx]; - for (i, m) in stacks.iter().enumerate() { if i == my_idx { continue; @@ -2628,7 +2624,7 @@ pub(crate) mod tests { let addr = NodeAddr { node_id: me.public(), info: crate::AddrInfo { - relay_url: Some(relay_url.clone()), + relay_url: None, direct_addresses: new_eps.iter().map(|ep| ep.addr).collect(), }, }; @@ -2642,13 +2638,12 @@ pub(crate) mod tests { for (my_idx, m) in stacks.iter().enumerate() { let m = m.clone(); let stacks = stacks.clone(); - let relay_url = relay_url.clone(); tasks.spawn(async move { let me = m.endpoint.node_id().fmt_short(); let mut stream = m.endpoint.local_endpoints(); while let Some(new_eps) = stream.next().await { info!(%me, "conn{} endpoints update: {:?}", my_idx + 1, new_eps); - update_eps(&stacks, my_idx, new_eps, relay_url.clone()); + update_eps(&stacks, my_idx, new_eps); } }); } @@ -2678,7 +2673,7 @@ pub(crate) mod tests { }) .await .context("failed to connect nodes")?; - + info!("all nodes meshed"); Ok(guard) } @@ -2727,14 +2722,9 @@ pub(crate) mod tests { } #[instrument(skip_all, fields(me = %ep.endpoint.node_id().fmt_short()))] - async fn echo_sender( - ep: MagicStack, - dest_id: PublicKey, - relay_url: RelayUrl, - msg: &[u8], - ) -> Result<()> { + async fn echo_sender(ep: MagicStack, dest_id: PublicKey, msg: &[u8]) -> Result<()> { info!("connecting to {}", dest_id.fmt_short()); - let dest = NodeAddr::new(dest_id).with_relay_url(relay_url); + let dest = NodeAddr::new(dest_id); let conn = ep .endpoint .connect(dest, ALPN) @@ -2775,18 +2765,13 @@ pub(crate) mod tests { } /// Runs a roundtrip between the [`echo_sender`] and [`echo_receiver`]. - async fn run_roundtrip( - sender: MagicStack, - receiver: MagicStack, - relay_url: RelayUrl, - payload: &[u8], - ) { + async fn run_roundtrip(sender: MagicStack, receiver: MagicStack, payload: &[u8]) { let send_node_id = sender.endpoint.node_id(); let recv_node_id = receiver.endpoint.node_id(); info!("\nroundtrip: {send_node_id:#} -> {recv_node_id:#}"); let receiver_task = tokio::spawn(echo_receiver(receiver)); - let sender_res = echo_sender(sender, recv_node_id, relay_url, payload).await; + let sender_res = echo_sender(sender, recv_node_id, payload).await; let sender_is_err = match sender_res { Ok(()) => false, Err(err) => { @@ -2817,23 +2802,22 @@ pub(crate) mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_two_devices_roundtrip_quinn_magic() -> Result<()> { iroh_test::logging::setup_multithreaded(); - let (relay_map, relay_url, _cleanup_guard) = run_relay_server().await?; - let m1 = MagicStack::new(relay_map.clone()).await?; - let m2 = MagicStack::new(relay_map.clone()).await?; + let m1 = MagicStack::new(RelayMode::Disabled).await?; + let m2 = MagicStack::new(RelayMode::Disabled).await?; - let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], relay_url.clone()).await?; + let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?; for i in 0..5 { info!("\n-- round {i}"); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; info!("\n-- larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data).await; + run_roundtrip(m2.clone(), m1.clone(), &data).await; } Ok(()) @@ -2853,12 +2837,11 @@ pub(crate) mod tests { /// with (simulated) network changes. async fn test_two_devices_roundtrip_network_change_impl() -> Result<()> { iroh_test::logging::setup_multithreaded(); - let (relay_map, relay_url, _cleanup) = run_relay_server().await?; - let m1 = MagicStack::new(relay_map.clone()).await?; - let m2 = MagicStack::new(relay_map.clone()).await?; + let m1 = MagicStack::new(RelayMode::Disabled).await?; + let m2 = MagicStack::new(RelayMode::Disabled).await?; - let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], relay_url.clone()).await?; + let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?; let offset = || { let delay = rand::thread_rng().gen_range(10..=500); @@ -2883,14 +2866,14 @@ pub(crate) mod tests { for i in 0..rounds { println!("-- [m1 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; println!("-- [m1 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data).await; + run_roundtrip(m2.clone(), m1.clone(), &data).await; } std::mem::drop(m1_network_change_guard); @@ -2912,14 +2895,14 @@ pub(crate) mod tests { for i in 0..rounds { println!("-- [m2 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; println!("-- [m2 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data).await; + run_roundtrip(m2.clone(), m1.clone(), &data).await; } std::mem::drop(m2_network_change_guard); @@ -2942,14 +2925,14 @@ pub(crate) mod tests { for i in 0..rounds { println!("-- [m1 & m2 changes] round {}", i + 1); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), b"hello m1").await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), b"hello m2").await; + run_roundtrip(m1.clone(), m2.clone(), b"hello m1").await; + run_roundtrip(m2.clone(), m1.clone(), b"hello m2").await; println!("-- [m1 & m2 changes] larger data"); let mut data = vec![0u8; 10 * 1024]; rand::thread_rng().fill_bytes(&mut data); - run_roundtrip(m1.clone(), m2.clone(), relay_url.clone(), &data).await; - run_roundtrip(m2.clone(), m1.clone(), relay_url.clone(), &data).await; + run_roundtrip(m1.clone(), m2.clone(), &data).await; + run_roundtrip(m2.clone(), m1.clone(), &data).await; } std::mem::drop(m1_m2_network_change_guard); @@ -2961,12 +2944,11 @@ pub(crate) mod tests { iroh_test::logging::setup_multithreaded(); for i in 0..10 { println!("-- round {i}"); - let (relay_map, url, _cleanup) = run_relay_server().await?; println!("setting up magic stack"); - let m1 = MagicStack::new(relay_map.clone()).await?; - let m2 = MagicStack::new(relay_map.clone()).await?; + let m1 = MagicStack::new(RelayMode::Disabled).await?; + let m2 = MagicStack::new(RelayMode::Disabled).await?; - let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?; + let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?; println!("closing endpoints"); let msock1 = m1.endpoint.magic_sock(); diff --git a/iroh-net/src/magicsock/node_map/best_addr.rs b/iroh-net/src/magicsock/node_map/best_addr.rs index d87aea1950..6378108708 100644 --- a/iroh-net/src/magicsock/node_map/best_addr.rs +++ b/iroh-net/src/magicsock/node_map/best_addr.rs @@ -173,14 +173,14 @@ impl BestAddr { %addr, latency = ?latency, trust_for = ?trust_until.duration_since(Instant::now()), - "re-selecting direct path for endpoint" + "re-selecting direct path for node" ); } else { info!( %addr, latency = ?latency, trust_for = ?trust_until.duration_since(Instant::now()), - "selecting new direct path for endpoint" + "selecting new direct path for node" ); } let was_empty = self.is_empty(); diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index fa33996f39..dfdf1ebe07 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -799,20 +799,13 @@ impl NodeState { /// Handles a Pong message (a reply to an earlier ping). /// /// It reports the address and key that should be inserted for the endpoint if any. + #[instrument(skip(self))] pub(super) fn handle_pong( &mut self, m: &disco::Pong, src: SendAddr, ) -> Option<(SocketAddr, PublicKey)> { let is_relay = src.is_relay(); - - trace!( - tx = %hex::encode(m.tx_id), - pong_src = %src, - pong_ping_src = %m.src, - is_relay = %src.is_relay(), - "received pong" - ); match self.sent_pings.remove(&m.tx_id) { None => { // This is not a pong for a ping we sent. @@ -830,7 +823,7 @@ impl NodeState { debug!( tx = %hex::encode(m.tx_id), src = %src, - reported_ping_src = %m.src, + reported_ping_src = %m.ping_observed_addr, ping_dst = %sp.to, is_relay = %src.is_relay(), latency = %latency.as_millis(), @@ -851,7 +844,7 @@ impl NodeState { latency, pong_at: now, from: src, - pong_src: m.src.clone(), + pong_src: m.ping_observed_addr.clone(), }); } } @@ -866,7 +859,7 @@ impl NodeState { latency, pong_at: now, from: src, - pong_src: m.src.clone(), + pong_src: m.ping_observed_addr.clone(), }); } other => { @@ -874,7 +867,11 @@ impl NodeState { // waiting for the response. It was either set to None or changed to // another relay. This should either never happen or be extremely // unlikely. Log and ignore for now - warn!(stored=?other, received=?url, "disco: ignoring pong via relay for different relay to the last one stored"); + warn!( + stored=?other, + received=?url, + "ignoring pong via relay for different relay from last one", + ); } }, } @@ -1293,7 +1290,7 @@ impl PathState { write!(w, "active ")?; } if let Some(ref pong) = self.recent_pong { - write!(w, "pong-received({:?} ago)", pong.pong_at.elapsed())?; + write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; } if let Some(when) = self.last_incoming_ping() { write!(w, "ping-received({:?} ago) ", when.elapsed())?; diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 22120e707f..aab85e11f7 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,8 +37,8 @@ iroh-docs = { version = "0.15.0", path = "../iroh-docs" } iroh-gossip = { version = "0.15.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.8.0", default-features = false, features = ["flume-transport", "quinn-transport"] } -quinn = "0.10" +quic-rpc = { version = "0.9.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" serde = { version = "1", features = ["derive"] } strum = { version = "0.25", features = ["derive"] } diff --git a/iroh/src/docs_engine.rs b/iroh/src/docs_engine.rs index 85f9ed4470..71b847d042 100644 --- a/iroh/src/docs_engine.rs +++ b/iroh/src/docs_engine.rs @@ -176,7 +176,10 @@ impl Engine { } /// Handle an incoming iroh-docs connection. - pub(super) async fn handle_connection(&self, conn: quinn::Connecting) -> anyhow::Result<()> { + pub(super) async fn handle_connection( + &self, + conn: iroh_net::magic_endpoint::Connecting, + ) -> anyhow::Result<()> { self.to_live_actor .send(ToLiveActor::HandleConnection { conn }) .await?; diff --git a/iroh/src/docs_engine/live.rs b/iroh/src/docs_engine/live.rs index 0299ec03a5..67bd7da1dc 100644 --- a/iroh/src/docs_engine/live.rs +++ b/iroh/src/docs_engine/live.rs @@ -75,7 +75,7 @@ pub enum ToLiveActor { reply: sync::oneshot::Sender>, }, HandleConnection { - conn: quinn::Connecting, + conn: iroh_net::magic_endpoint::Connecting, }, AcceptSyncRequest { namespace: NamespaceId, @@ -715,7 +715,7 @@ impl LiveActor { } #[instrument("accept", skip_all)] - pub async fn handle_connection(&mut self, conn: quinn::Connecting) { + pub async fn handle_connection(&mut self, conn: iroh_net::magic_endpoint::Connecting) { let to_actor_tx = self.sync_actor_tx.clone(); let accept_request_cb = move |namespace, peer| { let to_actor_tx = to_actor_tx.clone(); diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index e423328e4e..063f955d51 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -19,7 +19,6 @@ use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ discovery::{dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery}, dns::DnsResolver, - magic_endpoint::get_alpn, relay::RelayMode, MagicEndpoint, }; @@ -571,7 +570,7 @@ where }, // handle incoming p2p connections Some(mut connecting) = server.accept() => { - let alpn = match get_alpn(&mut connecting).await { + let alpn = match connecting.alpn().await { Ok(alpn) => alpn, Err(err) => { error!("invalid handshake: {:?}", err); @@ -706,7 +705,7 @@ impl Default for GcPolicy { // TODO: Restructure this code to not take all these arguments. #[allow(clippy::too_many_arguments)] async fn handle_connection( - connecting: quinn::Connecting, + connecting: iroh_net::magic_endpoint::Connecting, alpn: String, node: Arc>, gossip: Gossip, @@ -716,8 +715,9 @@ async fn handle_connection( GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, DOCS_ALPN => sync.handle_connection(connecting).await?, alpn if alpn == iroh_blobs::protocol::ALPN => { + let connection = connecting.await?; iroh_blobs::provider::handle_connection( - connecting, + connection, node.db.clone(), node.callbacks.clone(), node.rt.clone(),