From 3e1e04449f7dc9acaf21a5ac9313516e270d0c3c Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 26 Sep 2025 10:14:01 +0200 Subject: [PATCH 1/4] fix: set keepalive in client transport config --- src/util.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/util.rs b/src/util.rs index 91374d5..d5d0b6b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -5,7 +5,7 @@ #[cfg(feature = "quinn_endpoint_setup")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn_endpoint_setup")))] mod quinn_setup_utils { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use anyhow::Result; use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, ServerConfig}; @@ -31,7 +31,11 @@ mod quinn_setup_utils { let quic_client_config = quinn::crypto::rustls::QuicClientConfig::try_from(crypto_client_config)?; - Ok(ClientConfig::new(Arc::new(quic_client_config))) + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + let mut client_config = ClientConfig::new(Arc::new(quic_client_config)); + client_config.transport_config(Arc::new(transport_config)); + Ok(client_config) } /// Create a quinn server config with a self-signed certificate From 64ad5964ea5ecea4adbcf29edfe4c6d0fa5e9066 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 26 Sep 2025 10:14:23 +0200 Subject: [PATCH 2/4] fix(examples): fix tracing subscriber init --- examples/compute.rs | 2 +- examples/storage.rs | 2 +- irpc-iroh/examples/derive.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/compute.rs b/examples/compute.rs index 40dfaf6..2b56a77 100644 --- a/examples/compute.rs +++ b/examples/compute.rs @@ -448,7 +448,7 @@ pub async fn reference_bench(n: u64) -> anyhow::Result<()> { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt().init(); + tracing_subscriber::fmt::init(); println!("Local use"); local().await?; println!("Remote use"); diff --git a/examples/storage.rs b/examples/storage.rs index 100a16a..e4e939f 100644 --- a/examples/storage.rs +++ b/examples/storage.rs @@ -228,7 +228,7 @@ async fn remote() -> anyhow::Result<()> { #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt().init(); + tracing_subscriber::fmt::init(); println!("Local use"); local().await?; println!("Remote use"); diff --git a/irpc-iroh/examples/derive.rs b/irpc-iroh/examples/derive.rs index 0307cee..565f51d 100644 --- a/irpc-iroh/examples/derive.rs +++ b/irpc-iroh/examples/derive.rs @@ -5,7 +5,7 @@ use self::storage::StorageApi; #[tokio::main] async fn main() -> Result<()> { - tracing_subscriber::fmt().init(); + tracing_subscriber::fmt::init(); println!("Local use"); local().await?; println!("Remote use"); From e8cc85a809918c6e7dd52affba94e4b964ff4901 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 26 Sep 2025 10:14:36 +0200 Subject: [PATCH 3/4] fix: harden listen loops --- irpc-iroh/src/lib.rs | 33 +++++++++++++++++++++++++-------- src/lib.rs | 34 ++++++++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index e139e6f..5851ded 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -18,7 +18,7 @@ use irpc::{ }; use n0_future::{future::Boxed as BoxFuture, TryFutureExt}; use serde::de::DeserializeOwned; -use tracing::{trace, trace_span, warn, Instrument}; +use tracing::{debug, error_span, trace, trace_span, warn, Instrument}; /// Returns a client that connects to a irpc service using an [`iroh::Endpoint`]. pub fn client( @@ -207,6 +207,10 @@ pub async fn handle_connection( connection: Connection, handler: Handler, ) -> io::Result<()> { + if let Ok(remote) = connection.remote_node_id() { + tracing::Span::current().record("remote", tracing::field::display(remote.fmt_short())); + } + debug!("connection accepted"); loop { let Some((msg, rx, tx)) = read_request_raw(&connection).await? else { return Ok(()); @@ -270,19 +274,32 @@ pub async fn read_request_raw( pub async fn listen(endpoint: iroh::Endpoint, handler: Handler) { let mut request_id = 0u64; let mut tasks = n0_future::task::JoinSet::new(); - while let Some(incoming) = endpoint.accept().await { + loop { + let incoming = tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + res.expect("irpc connection task panicked"); + continue; + } + incoming = endpoint.accept() => { + match incoming { + None => break, + Some(incoming) => incoming + } + } + }; let handler = handler.clone(); let fut = async move { - let connection = match incoming.await { - Ok(connection) => connection, + match incoming.await { + Ok(connection) => match handle_connection(connection, handler).await { + Err(err) => warn!("connection closed with error: {err:?}"), + Ok(()) => debug!("connection closed"), + }, Err(cause) => { - warn!("failed to accept connection {cause:?}"); - return io::Result::Ok(()); + warn!("failed to accept connection: {cause:?}"); } }; - handle_connection(connection, handler).await }; - let span = trace_span!("rpc", id = request_id); + let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty); tasks.spawn(fut.instrument(span)); request_id += 1; } diff --git a/src/lib.rs b/src/lib.rs index 9491472..b11b9ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1534,7 +1534,7 @@ pub mod rpc { use quinn::ConnectionError; use serde::de::DeserializeOwned; use smallvec::SmallVec; - use tracing::{trace, trace_span, warn, Instrument}; + use tracing::{debug, error_span, trace, warn, Instrument}; use crate::{ channel::{ @@ -2054,19 +2054,32 @@ pub mod rpc { ) { let mut request_id = 0u64; let mut tasks = JoinSet::new(); - while let Some(incoming) = endpoint.accept().await { + loop { + let incoming = tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + res.expect("irpc connection task panicked"); + continue; + } + incoming = endpoint.accept() => { + match incoming { + None => break, + Some(incoming) => incoming + } + } + }; let handler = handler.clone(); let fut = async move { - let connection = match incoming.await { - Ok(connection) => connection, + match incoming.await { + Ok(connection) => match handle_connection(connection, handler).await { + Err(err) => warn!("connection closed with error: {err:?}"), + Ok(()) => debug!("connection closed"), + }, Err(cause) => { - warn!("failed to accept connection {cause:?}"); - return io::Result::Ok(()); + warn!("failed to accept connection: {cause:?}"); } }; - handle_connection(connection, handler).await }; - let span = trace_span!("rpc", id = request_id); + let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty); tasks.spawn(fut.instrument(span)); request_id += 1; } @@ -2077,6 +2090,11 @@ pub mod rpc { connection: quinn::Connection, handler: Handler, ) -> io::Result<()> { + tracing::Span::current().record( + "remote", + tracing::field::display(connection.remote_address()), + ); + debug!("connection accepted"); loop { let Some((msg, rx, tx)) = read_request_raw(&connection).await? else { return Ok(()); From 1853f125a1a77f84b78868f3016a91edc057e2af Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 26 Sep 2025 10:21:25 +0200 Subject: [PATCH 4/4] fixup: CI and tokio feature flag --- .github/workflows/ci.yml | 2 ++ Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ce9a82..302666c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,8 @@ jobs: steps: - uses: actions/checkout@v2 - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy - uses: swatinem/rust-cache@v2 - name: cargo fmt run: cargo fmt --all -- --check diff --git a/Cargo.toml b/Cargo.toml index 3f6b9f2..9f22489 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ crate-type = ["cdylib", "rlib"] [dependencies] # we require serde even in non-rpc mode serde = { workspace = true } -# just for the oneshot and mpsc queues -tokio = { workspace = true, features = ["sync"] } +# just for the oneshot and mpsc queues, and tokio::select! +tokio = { workspace = true, features = ["sync", "macros"] } # for PollSender (which for some reason is not available in the main tokio api) tokio-util = { version = "0.7.14", default-features = false } # errors