From ea018f9e84c5158f7091f5678541c60ee90f2630 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 12:34:05 +0300 Subject: [PATCH 01/15] Some initial renaming --- irpc-iroh/examples/0rtt.rs | 4 ++-- irpc-iroh/src/lib.rs | 6 +++--- src/lib.rs | 36 ++++++++++++++++++------------------ 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/irpc-iroh/examples/0rtt.rs b/irpc-iroh/examples/0rtt.rs index ffaa7ae..a3867f6 100644 --- a/irpc-iroh/examples/0rtt.rs +++ b/irpc-iroh/examples/0rtt.rs @@ -327,8 +327,8 @@ mod ping { #[derive(Debug, Clone)] struct IrohConnection(Connection); - impl irpc::rpc::RemoteConnection for IrohConnection { - fn clone_boxed(&self) -> Box { + impl irpc::rpc::BoxedConnection for IrohConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 5851ded..242ed78 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -10,7 +10,7 @@ use iroh::{ use irpc::{ channel::RecvError, rpc::{ - Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, + BoxedConnection, Handler, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, MAX_MESSAGE_SIZE, }, util::AsyncReadVarintExt, @@ -55,8 +55,8 @@ impl IrohRemoteConnection { } } -impl RemoteConnection for IrohRemoteConnection { - fn clone_boxed(&self) -> Box { +impl BoxedConnection for IrohRemoteConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } diff --git a/src/lib.rs b/src/lib.rs index b11b9ff..bdea66a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1018,7 +1018,7 @@ impl Clone for Client { impl From> for Client { fn from(tx: LocalSender) -> Self { - Self(ClientInner::Local(tx.0), PhantomData) + Self(ClientInner::Direct(tx.0), PhantomData) } } @@ -1040,8 +1040,8 @@ impl Client { /// This is used from crates that want to provide other transports than quinn, /// such as the iroh transport. #[cfg(feature = "rpc")] - pub fn boxed(remote: impl rpc::RemoteConnection) -> Self { - Self(ClientInner::Remote(Box::new(remote)), PhantomData) + pub fn boxed(remote: impl rpc::BoxedConnection) -> Self { + Self(ClientInner::Boxed(Box::new(remote)), PhantomData) } /// Creates a new client from a `tokio::sync::mpsc::Sender`. @@ -1053,8 +1053,8 @@ impl Client { /// requests. pub fn as_local(&self) -> Option> { match &self.0 { - ClientInner::Local(tx) => Some(tx.clone().into()), - ClientInner::Remote(..) => None, + ClientInner::Direct(tx) => Some(tx.clone().into()), + ClientInner::Boxed(..) => None, } } @@ -1078,8 +1078,8 @@ impl Client { #[cfg(feature = "rpc")] { let cloned = match &self.0 { - ClientInner::Local(tx) => Request::Local(tx.clone()), - ClientInner::Remote(connection) => Request::Remote(connection.clone_boxed()), + ClientInner::Direct(tx) => Request::Local(tx.clone()), + ClientInner::Boxed(connection) => Request::Remote(connection.clone_boxed()), }; async move { match cloned { @@ -1093,7 +1093,7 @@ impl Client { } #[cfg(not(feature = "rpc"))] { - let ClientInner::Local(tx) = &self.0 else { + let ClientInner::Direct(tx) = &self.0 else { unreachable!() }; let tx = tx.clone().into(); @@ -1406,24 +1406,24 @@ impl Client { #[derive(Debug)] pub(crate) enum ClientInner { - Local(tokio::sync::mpsc::Sender), + Direct(tokio::sync::mpsc::Sender), #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] - Remote(Box), + Boxed(Box), #[cfg(not(feature = "rpc"))] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] #[allow(dead_code)] - Remote(PhantomData), + Boxed(PhantomData), } impl Clone for ClientInner { fn clone(&self) -> Self { match self { - Self::Local(tx) => Self::Local(tx.clone()), + Self::Direct(tx) => Self::Direct(tx.clone()), #[cfg(feature = "rpc")] - Self::Remote(conn) => Self::Remote(conn.clone_boxed()), + Self::Boxed(conn) => Self::Boxed(conn.clone_boxed()), #[cfg(not(feature = "rpc"))] - Self::Remote(_) => unreachable!(), + Self::Boxed(_) => unreachable!(), } } } @@ -1616,9 +1616,9 @@ pub mod rpc { /// /// This is done as a trait instead of an enum, so we don't need an iroh /// dependency in the main crate. - pub trait RemoteConnection: Send + Sync + Debug + 'static { + pub trait BoxedConnection: Send + Sync + Debug + 'static { /// Boxed clone so the trait is dynable. - fn clone_boxed(&self) -> Box; + fn clone_boxed(&self) -> Box; /// Open a bidirectional stream to the remote service. fn open_bi( @@ -1650,8 +1650,8 @@ pub mod rpc { } } - impl RemoteConnection for QuinnRemoteConnection { - fn clone_boxed(&self) -> Box { + impl BoxedConnection for QuinnRemoteConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } From 2b62879319d43b3877824349f0d29e7de49389c8 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 13:14:43 +0300 Subject: [PATCH 02/15] Implement with_map, with_filter_map and with_filter for mpsc::Sender and oneshot::Sender Implement mao, filter_map and filter for mpsc::Receiver oneshot::Receiver is a future, so you can just do this yourself using FutureExt. --- src/lib.rs | 217 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 209 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bdea66a..0880882 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -373,7 +373,7 @@ pub mod channel { use n0_future::future::Boxed as BoxFuture; use super::{RecvError, SendError}; - use crate::util::FusedOneshotReceiver; + use crate::{util::FusedOneshotReceiver, RpcMessage}; /// Create a local oneshot sender and receiver pair. /// @@ -462,6 +462,41 @@ pub mod channel { Sender::Boxed(f) => f(value).await, } } + + pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender + where + T: RpcMessage, + { + self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + pub fn with_map(self, f: F) -> Sender + where + F: Fn(U) -> T + Send + Sync + 'static, + U: RpcMessage, + T: RpcMessage, + { + self.with_filter_map(move |u| Some(f(u))) + } + + pub fn with_filter_map(self, f: F) -> Sender + where + F: Fn(U) -> Option + Send + Sync + 'static, + U: RpcMessage, + T: RpcMessage, + { + let inner: BoxedSender = Box::new(move |value| { + let opt = f(value); + Box::pin(async move { + if let Some(v) = opt { + self.send(v).await + } else { + Ok(()) + } + }) + }); + Sender::Boxed(inner) + } } impl Sender { @@ -546,7 +581,7 @@ pub mod channel { /// /// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc. pub mod mpsc { - use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc}; + use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc}; use super::{RecvError, SendError}; use crate::RpcMessage; @@ -579,6 +614,37 @@ pub mod channel { } } + pub fn with_filter(self, f: F) -> Sender + where + F: Fn(&T) -> bool + Send + Sync + 'static, + T: RpcMessage, + { + self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + pub fn with_map(self, f: F) -> Sender + where + F: Fn(U) -> T + Send + Sync + 'static, + U: RpcMessage, + T: RpcMessage, + { + self.with_filter_map(move |u| Some(f(u))) + } + + pub fn with_filter_map(self, f: F) -> Sender + where + F: Fn(U) -> Option + Send + Sync + 'static, + U: RpcMessage, + T: RpcMessage, + { + let inner: Arc> = Arc::new(FilterMapSender { + f, + sender: self, + _p: PhantomData, + }); + Sender::Boxed(inner) + } + pub async fn closed(&self) where T: RpcMessage, @@ -748,6 +814,35 @@ pub mod channel { } } + pub fn map(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Receiver + where + T: RpcMessage, + U: RpcMessage, + { + self.filter_map(move |u| Some(f(u))) + } + + pub fn filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Receiver + where + T: RpcMessage, + { + self.filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + pub fn filter_map(self, f: F) -> Receiver + where + T: RpcMessage, + U: RpcMessage, + F: Fn(T) -> Option + Send + Sync + 'static, + { + let inner: Box> = Box::new(FilterMapReceiver { + f, + receiver: self, + _p: PhantomData, + }); + Receiver::Boxed(inner) + } + #[cfg(feature = "stream")] pub fn into_stream( self, @@ -789,6 +884,112 @@ pub mod channel { } } + struct FilterMapSender { + f: F, + sender: Sender, + _p: PhantomData, + } + + impl Debug for FilterMapSender + where + F: Fn(U) -> Option + Send + Sync + 'static, + T: RpcMessage, + U: RpcMessage, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilterMapSender").finish_non_exhaustive() + } + } + + impl DynSender for FilterMapSender + where + F: Fn(U) -> Option + Send + Sync + 'static, + T: RpcMessage, + U: RpcMessage, + { + fn send( + &self, + value: U, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + if let Some(v) = (self.f)(value) { + self.sender.send(v).await + } else { + Ok(()) + } + }) + } + + fn try_send( + &self, + value: U, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + if let Some(v) = (self.f)(value) { + self.sender.try_send(v).await + } else { + Ok(true) + } + }) + } + + fn is_rpc(&self) -> bool { + self.sender.is_rpc() + } + + fn closed(&self) -> Pin + Send + Sync + '_>> { + match self { + FilterMapSender { + sender: Sender::Tokio(tx), + .. + } => Box::pin(tx.closed()), + FilterMapSender { + sender: Sender::Boxed(sink), + .. + } => sink.closed(), + } + } + } + + struct FilterMapReceiver { + f: F, + receiver: Receiver, + _p: PhantomData, + } + + impl Debug for FilterMapReceiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilterMapReceiver").finish_non_exhaustive() + } + } + + impl DynReceiver for FilterMapReceiver + where + F: Fn(T) -> Option + Send + Sync + 'static, + T: RpcMessage, + U: RpcMessage, + { + fn recv( + &mut self, + ) -> Pin< + Box< + dyn Future, RecvError>> + + Send + + Sync + + '_, + >, + > { + Box::pin(async move { + while let Some(msg) = self.receiver.recv().await? { + if let Some(v) = (self.f)(msg) { + return Ok(Some(v)); + } + } + Ok(None) + }) + } + } + impl crate::sealed::Sealed for Receiver {} impl crate::Receiver for Receiver {} } @@ -1018,7 +1219,7 @@ impl Clone for Client { impl From> for Client { fn from(tx: LocalSender) -> Self { - Self(ClientInner::Direct(tx.0), PhantomData) + Self(ClientInner::Tokio(tx.0), PhantomData) } } @@ -1053,7 +1254,7 @@ impl Client { /// requests. pub fn as_local(&self) -> Option> { match &self.0 { - ClientInner::Direct(tx) => Some(tx.clone().into()), + ClientInner::Tokio(tx) => Some(tx.clone().into()), ClientInner::Boxed(..) => None, } } @@ -1078,7 +1279,7 @@ impl Client { #[cfg(feature = "rpc")] { let cloned = match &self.0 { - ClientInner::Direct(tx) => Request::Local(tx.clone()), + ClientInner::Tokio(tx) => Request::Local(tx.clone()), ClientInner::Boxed(connection) => Request::Remote(connection.clone_boxed()), }; async move { @@ -1093,7 +1294,7 @@ impl Client { } #[cfg(not(feature = "rpc"))] { - let ClientInner::Direct(tx) = &self.0 else { + let ClientInner::Tokio(tx) = &self.0 else { unreachable!() }; let tx = tx.clone().into(); @@ -1406,7 +1607,7 @@ impl Client { #[derive(Debug)] pub(crate) enum ClientInner { - Direct(tokio::sync::mpsc::Sender), + Tokio(tokio::sync::mpsc::Sender), #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] Boxed(Box), @@ -1419,7 +1620,7 @@ pub(crate) enum ClientInner { impl Clone for ClientInner { fn clone(&self) -> Self { match self { - Self::Direct(tx) => Self::Direct(tx.clone()), + Self::Tokio(tx) => Self::Tokio(tx.clone()), #[cfg(feature = "rpc")] Self::Boxed(conn) => Self::Boxed(conn.clone_boxed()), #[cfg(not(feature = "rpc"))] From 4a253428e70936469a468664da697837f47c3dd9 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 15:12:08 +0300 Subject: [PATCH 03/15] Relax trait constraints. this makes irpc::channel::mpsc::* a generic channel that can also be used for non-serializable msgs. Especially when mapping you might not want intermediate map results ot be serializable! --- src/lib.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0880882..a4cc5e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -373,7 +373,7 @@ pub mod channel { use n0_future::future::Boxed as BoxFuture; use super::{RecvError, SendError}; - use crate::{util::FusedOneshotReceiver, RpcMessage}; + use crate::util::FusedOneshotReceiver; /// Create a local oneshot sender and receiver pair. /// @@ -465,7 +465,7 @@ pub mod channel { pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender where - T: RpcMessage, + T: Send + Sync + 'static, { self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) } @@ -473,8 +473,8 @@ pub mod channel { pub fn with_map(self, f: F) -> Sender where F: Fn(U) -> T + Send + Sync + 'static, - U: RpcMessage, - T: RpcMessage, + U: Send + Sync + 'static, + T: Send + Sync + 'static, { self.with_filter_map(move |u| Some(f(u))) } @@ -482,8 +482,8 @@ pub mod channel { pub fn with_filter_map(self, f: F) -> Sender where F: Fn(U) -> Option + Send + Sync + 'static, - U: RpcMessage, - T: RpcMessage, + U: Send + Sync + 'static, + T: Send + Sync + 'static, { let inner: BoxedSender = Box::new(move |value| { let opt = f(value); @@ -800,7 +800,7 @@ pub mod channel { Boxed(Box>), } - impl Receiver { + impl Receiver { /// Receive a message /// /// Returns Ok(None) if the sender has been dropped or the remote end has @@ -831,8 +831,8 @@ pub mod channel { pub fn filter_map(self, f: F) -> Receiver where - T: RpcMessage, - U: RpcMessage, + T: Send + Sync + 'static, + U: Send + Sync + 'static, F: Fn(T) -> Option + Send + Sync + 'static, { let inner: Box> = Box::new(FilterMapReceiver { @@ -966,8 +966,8 @@ pub mod channel { impl DynReceiver for FilterMapReceiver where F: Fn(T) -> Option + Send + Sync + 'static, - T: RpcMessage, - U: RpcMessage, + T: Send + Sync + 'static, + U: Send + Sync + 'static, { fn recv( &mut self, From 251ed2d85351b4c456e073a2d40dffd18f96fe09 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 15:30:13 +0300 Subject: [PATCH 04/15] Make Clone less restritive --- src/lib.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index a4cc5e9..0770ea8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -597,12 +597,20 @@ pub mod channel { /// Single producer, single consumer sender. /// /// For the local case, this wraps a tokio::sync::mpsc::Sender. - #[derive(Clone)] pub enum Sender { Tokio(tokio::sync::mpsc::Sender), Boxed(Arc>), } + impl Clone for Sender { + fn clone(&self) -> Self { + match self { + Self::Tokio(tx) => Self::Tokio(tx.clone()), + Self::Boxed(inner) => Self::Boxed(inner.clone()), + } + } + } + impl Sender { pub fn is_rpc(&self) -> bool where From 8601e3688c05c93fa9b8f7314ca36c4741d7d07a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 15:44:32 +0300 Subject: [PATCH 05/15] Restrict trait constraints to make it possible to call send for non-Serializzable msgs. --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 0770ea8..5bfbfa6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -747,7 +747,7 @@ pub mod channel { } } - impl Sender { + impl Sender { /// Send a message and yield until either it is sent or an error occurs. /// /// ## Cancellation safety From 296ba016ef572833d0367fa446838d6f9463c45e Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 15:44:54 +0300 Subject: [PATCH 06/15] Remove SendFut it was not exported anyway, so the resulting future was anonymous all th etime. --- src/lib.rs | 86 +++++++++++++----------------------------------------- 1 file changed, 20 insertions(+), 66 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5bfbfa6..8d245af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -305,6 +305,7 @@ use self::{ }, sealed::Sealed, }; +use crate::channel::SendError; #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] @@ -1615,7 +1616,7 @@ impl Client { #[derive(Debug)] pub(crate) enum ClientInner { - Tokio(tokio::sync::mpsc::Sender), + Tokio(crate::channel::mpsc::Sender), #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] Boxed(Box), @@ -1707,7 +1708,7 @@ impl From for io::Error { /// [`WithChannels`]. #[derive(Debug)] #[repr(transparent)] -pub struct LocalSender(tokio::sync::mpsc::Sender); +pub struct LocalSender(crate::channel::mpsc::Sender); impl Clone for LocalSender { fn clone(&self) -> Self { @@ -1717,6 +1718,12 @@ impl Clone for LocalSender { impl From> for LocalSender { fn from(tx: tokio::sync::mpsc::Sender) -> Self { + Self(tx.into()) + } +} + +impl From> for LocalSender { + fn from(tx: crate::channel::mpsc::Sender) -> Self { Self(tx) } } @@ -2377,77 +2384,24 @@ pub enum Request { impl LocalSender { /// Send a message to the service - pub fn send(&self, value: impl Into>) -> SendFut + pub fn send( + &self, + value: impl Into>, + ) -> impl Future> + Send + 'static where T: Channels, S::Message: From>, { let value: S::Message = value.into().into(); - SendFut::new(self.0.clone(), value) + self.send_raw(value) } /// Send a message to the service without the type conversion magic - pub fn send_raw(&self, value: S::Message) -> SendFut { - SendFut::new(self.0.clone(), value) - } -} - -mod send_fut { - use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - }; - - use tokio::sync::mpsc::Sender; - use tokio_util::sync::PollSender; - - use crate::channel::SendError; - - pub struct SendFut { - poll_sender: PollSender, - value: Option, - } - - impl SendFut { - pub fn new(sender: Sender, value: T) -> Self { - Self { - poll_sender: PollSender::new(sender), - value: Some(value), - } - } - } - - impl Future for SendFut { - type Output = std::result::Result<(), SendError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // Safely extract the value - let value = match this.value.take() { - Some(v) => v, - None => return Poll::Ready(Ok(())), // Already completed - }; - - // Try to reserve capacity - match this.poll_sender.poll_reserve(cx) { - Poll::Ready(Ok(())) => { - // Send the item - this.poll_sender.send_item(value).ok(); - Poll::Ready(Ok(())) - } - Poll::Ready(Err(_)) => { - // Channel is closed - Poll::Ready(Err(SendError::ReceiverClosed)) - } - Poll::Pending => { - // Restore the value and wait - this.value = Some(value); - Poll::Pending - } - } - } + pub fn send_raw( + &self, + value: S::Message, + ) -> impl Future> + Send + 'static { + let x = self.0.clone(); + async move { x.send(value).await } } } -use send_fut::SendFut; From d1556e38f7af0231315046f0f7a2a2553c026f2c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 16:48:21 +0300 Subject: [PATCH 07/15] Undo wrong rename, but call tokio local, since it is no longer just tokio --- irpc-iroh/examples/0rtt.rs | 4 ++-- irpc-iroh/src/lib.rs | 6 +++--- src/lib.rs | 36 ++++++++++++++++++------------------ 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/irpc-iroh/examples/0rtt.rs b/irpc-iroh/examples/0rtt.rs index a3867f6..ffaa7ae 100644 --- a/irpc-iroh/examples/0rtt.rs +++ b/irpc-iroh/examples/0rtt.rs @@ -327,8 +327,8 @@ mod ping { #[derive(Debug, Clone)] struct IrohConnection(Connection); - impl irpc::rpc::BoxedConnection for IrohConnection { - fn clone_boxed(&self) -> Box { + impl irpc::rpc::RemoteConnection for IrohConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 242ed78..5851ded 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -10,7 +10,7 @@ use iroh::{ use irpc::{ channel::RecvError, rpc::{ - BoxedConnection, Handler, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, + Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, MAX_MESSAGE_SIZE, }, util::AsyncReadVarintExt, @@ -55,8 +55,8 @@ impl IrohRemoteConnection { } } -impl BoxedConnection for IrohRemoteConnection { - fn clone_boxed(&self) -> Box { +impl RemoteConnection for IrohRemoteConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } diff --git a/src/lib.rs b/src/lib.rs index 8d245af..23a499b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1228,7 +1228,7 @@ impl Clone for Client { impl From> for Client { fn from(tx: LocalSender) -> Self { - Self(ClientInner::Tokio(tx.0), PhantomData) + Self(ClientInner::Local(tx.0), PhantomData) } } @@ -1250,8 +1250,8 @@ impl Client { /// This is used from crates that want to provide other transports than quinn, /// such as the iroh transport. #[cfg(feature = "rpc")] - pub fn boxed(remote: impl rpc::BoxedConnection) -> Self { - Self(ClientInner::Boxed(Box::new(remote)), PhantomData) + pub fn boxed(remote: impl rpc::RemoteConnection) -> Self { + Self(ClientInner::Remote(Box::new(remote)), PhantomData) } /// Creates a new client from a `tokio::sync::mpsc::Sender`. @@ -1263,8 +1263,8 @@ impl Client { /// requests. pub fn as_local(&self) -> Option> { match &self.0 { - ClientInner::Tokio(tx) => Some(tx.clone().into()), - ClientInner::Boxed(..) => None, + ClientInner::Local(tx) => Some(tx.clone().into()), + ClientInner::Remote(..) => None, } } @@ -1288,8 +1288,8 @@ impl Client { #[cfg(feature = "rpc")] { let cloned = match &self.0 { - ClientInner::Tokio(tx) => Request::Local(tx.clone()), - ClientInner::Boxed(connection) => Request::Remote(connection.clone_boxed()), + ClientInner::Local(tx) => Request::Local(tx.clone()), + ClientInner::Remote(connection) => Request::Remote(connection.clone_boxed()), }; async move { match cloned { @@ -1303,7 +1303,7 @@ impl Client { } #[cfg(not(feature = "rpc"))] { - let ClientInner::Tokio(tx) = &self.0 else { + let ClientInner::Local(tx) = &self.0 else { unreachable!() }; let tx = tx.clone().into(); @@ -1616,24 +1616,24 @@ impl Client { #[derive(Debug)] pub(crate) enum ClientInner { - Tokio(crate::channel::mpsc::Sender), + Local(crate::channel::mpsc::Sender), #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] - Boxed(Box), + Remote(Box), #[cfg(not(feature = "rpc"))] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] #[allow(dead_code)] - Boxed(PhantomData), + Remote(PhantomData), } impl Clone for ClientInner { fn clone(&self) -> Self { match self { - Self::Tokio(tx) => Self::Tokio(tx.clone()), + Self::Local(tx) => Self::Local(tx.clone()), #[cfg(feature = "rpc")] - Self::Boxed(conn) => Self::Boxed(conn.clone_boxed()), + Self::Remote(conn) => Self::Remote(conn.clone_boxed()), #[cfg(not(feature = "rpc"))] - Self::Boxed(_) => unreachable!(), + Self::Remote(_) => unreachable!(), } } } @@ -1832,9 +1832,9 @@ pub mod rpc { /// /// This is done as a trait instead of an enum, so we don't need an iroh /// dependency in the main crate. - pub trait BoxedConnection: Send + Sync + Debug + 'static { + pub trait RemoteConnection: Send + Sync + Debug + 'static { /// Boxed clone so the trait is dynable. - fn clone_boxed(&self) -> Box; + fn clone_boxed(&self) -> Box; /// Open a bidirectional stream to the remote service. fn open_bi( @@ -1866,8 +1866,8 @@ pub mod rpc { } } - impl BoxedConnection for QuinnRemoteConnection { - fn clone_boxed(&self) -> Box { + impl RemoteConnection for QuinnRemoteConnection { + fn clone_boxed(&self) -> Box { Box::new(self.clone()) } From 43d5c7ca677a63d90e95f00393c22a5d2ffe220c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 16:57:49 +0300 Subject: [PATCH 08/15] reduce constraints even more --- src/lib.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 23a499b..0de63a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -626,7 +626,7 @@ pub mod channel { pub fn with_filter(self, f: F) -> Sender where F: Fn(&T) -> bool + Send + Sync + 'static, - T: RpcMessage, + T: Send + Sync + 'static, { self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) } @@ -634,8 +634,8 @@ pub mod channel { pub fn with_map(self, f: F) -> Sender where F: Fn(U) -> T + Send + Sync + 'static, - U: RpcMessage, - T: RpcMessage, + U: Send + Sync + 'static, + T: Send + Sync + 'static, { self.with_filter_map(move |u| Some(f(u))) } @@ -643,8 +643,8 @@ pub mod channel { pub fn with_filter_map(self, f: F) -> Sender where F: Fn(U) -> Option + Send + Sync + 'static, - U: RpcMessage, - T: RpcMessage, + U: Send + Sync + 'static, + T: Send + Sync + 'static, { let inner: Arc> = Arc::new(FilterMapSender { f, @@ -899,12 +899,7 @@ pub mod channel { _p: PhantomData, } - impl Debug for FilterMapSender - where - F: Fn(U) -> Option + Send + Sync + 'static, - T: RpcMessage, - U: RpcMessage, - { + impl Debug for FilterMapSender { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FilterMapSender").finish_non_exhaustive() } @@ -913,8 +908,8 @@ pub mod channel { impl DynSender for FilterMapSender where F: Fn(U) -> Option + Send + Sync + 'static, - T: RpcMessage, - U: RpcMessage, + T: Send + Sync + 'static, + U: Send + Sync + 'static, { fn send( &self, From 0b1fe9e498516ac3ed2e1686ad61df8784a5ad21 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 17:07:28 +0300 Subject: [PATCH 09/15] Allow creating a local client from an irpc channel of messages that can also be mapped. --- src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0de63a5..aa059b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1250,8 +1250,9 @@ impl Client { } /// Creates a new client from a `tokio::sync::mpsc::Sender`. - pub fn local(tx: tokio::sync::mpsc::Sender) -> Self { - tx.into() + pub fn local(tx: impl Into>) -> Self { + let tx: crate::channel::mpsc::Sender = tx.into(); + Self(ClientInner::Local(tx), PhantomData) } /// Get the local sender. This is useful if you don't care about remote From d15f8e22898d0aac3d92d56079a475c15e6ce9a8 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 17:07:58 +0300 Subject: [PATCH 10/15] Change compute example to use a crate mpsc receiver. --- examples/compute.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/compute.rs b/examples/compute.rs index 2b56a77..7908425 100644 --- a/examples/compute.rs +++ b/examples/compute.rs @@ -55,12 +55,12 @@ struct Multiply { // The actor that processes requests struct ComputeActor { - recv: tokio::sync::mpsc::Receiver, + recv: irpc::channel::mpsc::Receiver, } impl ComputeActor { pub fn local() -> ComputeApi { - let (tx, rx) = tokio::sync::mpsc::channel(128); + let (tx, rx) = irpc::channel::mpsc::channel(128); let actor = Self { recv: rx }; n0_future::task::spawn(actor.run()); ComputeApi { @@ -69,7 +69,7 @@ impl ComputeActor { } async fn run(mut self) { - while let Some(msg) = self.recv.recv().await { + while let Ok(Some(msg)) = self.recv.recv().await { n0_future::task::spawn(async move { if let Err(cause) = Self::handle(msg).await { eprintln!("Error: {cause}"); From 422ee0dae06f9f3a885a4fbc64c3578941534c7f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 6 Oct 2025 18:27:28 +0300 Subject: [PATCH 11/15] further reduce bounds, also comments --- src/lib.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index aa059b7..80f32c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -464,6 +464,9 @@ pub mod channel { } } + /// Applies a filter before sending. + /// + /// Messages that don't pass the filter are dropped. pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender where T: Send + Sync + 'static, @@ -471,6 +474,7 @@ pub mod channel { self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) } + /// Applies a transform before sending. pub fn with_map(self, f: F) -> Sender where F: Fn(U) -> T + Send + Sync + 'static, @@ -480,6 +484,9 @@ pub mod channel { self.with_filter_map(move |u| Some(f(u))) } + /// Applies a filter and transform before sending. + /// + /// Messages that don't pass the filter are dropped. pub fn with_filter_map(self, f: F) -> Sender where F: Fn(U) -> Option + Send + Sync + 'static, @@ -809,7 +816,7 @@ pub mod channel { Boxed(Box>), } - impl Receiver { + impl Receiver { /// Receive a message /// /// Returns Ok(None) if the sender has been dropped or the remote end has @@ -823,21 +830,28 @@ pub mod channel { } } + /// Map messages, transforming them from type T to type U. pub fn map(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Receiver where - T: RpcMessage, - U: RpcMessage, + T: Send + Sync + 'static, + U: Send + Sync + 'static, { self.filter_map(move |u| Some(f(u))) } + /// Filter messages, only passing through those for which the predicate returns true. + /// + /// Messages that don't pass the filter are dropped. pub fn filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Receiver where - T: RpcMessage, + T: Send + Sync + 'static, { self.filter_map(move |u| if f(&u) { Some(u) } else { None }) } + /// Filter and map messages, only passing through those for which the function returns Some. + /// + /// Messages that don't pass the filter are dropped. pub fn filter_map(self, f: F) -> Receiver where T: Send + Sync + 'static, From 909933258de61e437a37b2cb2fc1a05aa75539ba Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 7 Oct 2025 10:48:42 +0300 Subject: [PATCH 12/15] Add transform tests --- src/lib.rs | 8 ++++++-- src/tests.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 src/tests.rs diff --git a/src/lib.rs b/src/lib.rs index 80f32c3..cf28a33 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -307,6 +307,8 @@ use self::{ }; use crate::channel::SendError; +#[cfg(test)] +mod tests; #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] pub mod util; @@ -831,8 +833,9 @@ pub mod channel { } /// Map messages, transforming them from type T to type U. - pub fn map(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Receiver + pub fn map(self, f: F) -> Receiver where + F: Fn(T) -> U + Send + Sync + 'static, T: Send + Sync + 'static, U: Send + Sync + 'static, { @@ -842,8 +845,9 @@ pub mod channel { /// Filter messages, only passing through those for which the predicate returns true. /// /// Messages that don't pass the filter are dropped. - pub fn filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Receiver + pub fn filter(self, f: F) -> Receiver where + F: Fn(&T) -> bool + Send + Sync + 'static, T: Send + Sync + 'static, { self.filter_map(move |u| if f(&u) { Some(u) } else { None }) diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 0000000..fa57c13 --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,28 @@ +use std::vec; + +#[tokio::test] +async fn test_map_filter() { + use crate::channel::mpsc; + let (tx, rx) = mpsc::channel::(100); + // *2, filter multipes of 4, *3 if multiple of 8 + // + // the transforms are applied in reverse order! + let tx = tx + .with_filter_map(|x: u64| if x % 8 == 0 { Some(x * 3) } else { None }) + .with_filter(|x| x % 4 == 0) + .with_map(|x: u64| x * 2); + for i in 0..100 { + tx.send(i).await.ok(); + } + drop(tx); + // /24, filter multiples of 3, /2 if even + let mut rx = rx + .map(|x: u64| x / 24) + .filter(|x| x % 3 == 0) + .filter_map(|x: u64| if x % 2 == 0 { Some(x / 2) } else { None }); + let mut res = vec![]; + while let Ok(Some(x)) = rx.recv().await { + res.push(x); + } + assert_eq!(res, vec![0, 3, 6, 9, 12]); +} From 683fa7c7feb3f5fc71a0724caef106f980745ef5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 8 Oct 2025 10:20:40 +0300 Subject: [PATCH 13/15] Reduce some more bounds. --- src/lib.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cf28a33..23b0225 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -594,7 +594,6 @@ pub mod channel { use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc}; use super::{RecvError, SendError}; - use crate::RpcMessage; /// Create a local mpsc sender and receiver pair, with the given buffer size. /// @@ -632,6 +631,12 @@ pub mod channel { } } + /// Applies a filter before sending. + /// + /// Messages that don't pass the filter are dropped. + /// + /// If you want to combine multiple filters and maps with minimal + /// overhead, use `with_filter_map` directly. pub fn with_filter(self, f: F) -> Sender where F: Fn(&T) -> bool + Send + Sync + 'static, @@ -640,6 +645,10 @@ pub mod channel { self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) } + /// Applies a transform before sending. + /// + /// If you want to combine multiple filters and maps with minimal + /// overhead, use `with_filter_map` directly. pub fn with_map(self, f: F) -> Sender where F: Fn(U) -> T + Send + Sync + 'static, @@ -649,6 +658,10 @@ pub mod channel { self.with_filter_map(move |u| Some(f(u))) } + /// Applies a filter and transform before sending. + /// + /// Any combination of filters and maps can be expressed using + /// a single filter_map. pub fn with_filter_map(self, f: F) -> Sender where F: Fn(U) -> Option + Send + Sync + 'static, @@ -663,9 +676,10 @@ pub mod channel { Sender::Boxed(inner) } + /// Future that resolves when the sender is closed pub async fn closed(&self) where - T: RpcMessage, + T: Send + Sync + 'static, { match self { Sender::Tokio(tx) => tx.closed().await, @@ -676,7 +690,7 @@ pub mod channel { #[cfg(feature = "stream")] pub fn into_sink(self) -> impl n0_future::Sink + Send + 'static where - T: RpcMessage, + T: Send + Sync + 'static, { futures_util::sink::unfold(self, |sink, value| async move { sink.send(value).await?; From d8b8913b9026c74a57b831e2081809abeb1927d2 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 8 Oct 2025 10:46:46 +0300 Subject: [PATCH 14/15] Have dedicated RecvError for oneshot and mpsc In mpsc error, SenderClosed is just normal termination and should be modeled as returning None. --- irpc-iroh/src/lib.rs | 4 +- src/lib.rs | 162 ++++++++++++++++++++++++--------------- tests/mpsc_channel.rs | 23 +++--- tests/oneshot_channel.rs | 5 +- 4 files changed, 118 insertions(+), 76 deletions(-) diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 5851ded..bd1702c 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -8,7 +8,7 @@ use iroh::{ protocol::{AcceptError, ProtocolHandler}, }; use irpc::{ - channel::RecvError, + channel::oneshot, rpc::{ Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, MAX_MESSAGE_SIZE, @@ -257,7 +257,7 @@ pub async fn read_request_raw( ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(), b"request exceeded max message size", ); - return Err(RecvError::MaxMessageSizeExceeded.into()); + return Err(oneshot::RecvError::MaxMessageSizeExceeded.into()); } let mut buf = vec![0; size as usize]; recv.read_exact(&mut buf) diff --git a/src/lib.rs b/src/lib.rs index 23b0225..c4a6dcf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -371,13 +371,47 @@ pub mod channel { /// Oneshot channel, similar to tokio's oneshot channel pub mod oneshot { - use std::{fmt::Debug, future::Future, pin::Pin, task}; + use std::{fmt::Debug, future::Future, io, pin::Pin, task}; use n0_future::future::Boxed as BoxFuture; - use super::{RecvError, SendError}; + use super::SendError; use crate::util::FusedOneshotReceiver; + /// Error when receiving a oneshot or mpsc message. For local communication, + /// the only thing that can go wrong is that the sender has been closed. + /// + /// For rpc communication, there can be any number of errors, so this is a + /// generic io error. + #[derive(Debug, thiserror::Error)] + pub enum RecvError { + /// The sender has been closed. This is the only error that can occur + /// for local communication. + #[error("sender closed")] + SenderClosed, + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + /// + /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE + #[error("maximum message size exceeded")] + MaxMessageSizeExceeded, + /// An io error occurred. This can occur for remote communication, + /// due to a network error or deserialization error. + #[error("io error: {0}")] + Io(#[from] io::Error), + } + + impl From for io::Error { + fn from(e: RecvError) -> Self { + match e { + RecvError::Io(e) => e, + RecvError::SenderClosed => io::Error::new(io::ErrorKind::BrokenPipe, e), + RecvError::MaxMessageSizeExceeded => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + } + } + } + /// Create a local oneshot sender and receiver pair. /// /// This is currently using a tokio channel pair internally. @@ -591,9 +625,38 @@ pub mod channel { /// /// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc. pub mod mpsc { - use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc}; + use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc}; + + use super::SendError; - use super::{RecvError, SendError}; + /// Error when receiving a oneshot or mpsc message. For local communication, + /// the only thing that can go wrong is that the sender has been closed. + /// + /// For rpc communication, there can be any number of errors, so this is a + /// generic io error. + #[derive(Debug, thiserror::Error)] + pub enum RecvError { + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + /// + /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE + #[error("maximum message size exceeded")] + MaxMessageSizeExceeded, + /// An io error occurred. This can occur for remote communication, + /// due to a network error or deserialization error. + #[error("io error: {0}")] + Io(#[from] io::Error), + } + + impl From for io::Error { + fn from(e: RecvError) -> Self { + match e { + RecvError::Io(e) => e, + RecvError::MaxMessageSizeExceeded => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + } + } + } /// Create a local mpsc sender and receiver pair, with the given buffer size. /// @@ -1079,38 +1142,6 @@ pub mod channel { } } } - - /// Error when receiving a oneshot or mpsc message. For local communication, - /// the only thing that can go wrong is that the sender has been closed. - /// - /// For rpc communication, there can be any number of errors, so this is a - /// generic io error. - #[derive(Debug, thiserror::Error)] - pub enum RecvError { - /// The sender has been closed. This is the only error that can occur - /// for local communication. - #[error("sender closed")] - SenderClosed, - /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). - /// - /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE - #[error("maximum message size exceeded")] - MaxMessageSizeExceeded, - /// An io error occurred. This can occur for remote communication, - /// due to a network error or deserialization error. - #[error("io error: {0}")] - Io(#[from] io::Error), - } - - impl From for io::Error { - fn from(e: RecvError) -> Self { - match e { - RecvError::Io(e) => e, - RecvError::SenderClosed => io::Error::new(io::ErrorKind::BrokenPipe, e), - RecvError::MaxMessageSizeExceeded => io::Error::new(io::ErrorKind::InvalidData, e), - } - } - } } /// A wrapper for a message with channels to send and receive it. @@ -1694,8 +1725,10 @@ pub enum Error { Request(#[from] RequestError), #[error("send error: {0}")] Send(#[from] channel::SendError), - #[error("recv error: {0}")] - Recv(#[from] channel::RecvError), + #[error("mpsc recv error: {0}")] + MpscRecv(#[from] channel::mpsc::RecvError), + #[error("oneshot recv error: {0}")] + OneshotRecv(#[from] channel::oneshot::RecvError), #[cfg(feature = "rpc")] #[error("recv error: {0}")] Write(#[from] rpc::WriteError), @@ -1709,7 +1742,8 @@ impl From for io::Error { match e { Error::Request(e) => e.into(), Error::Send(e) => e.into(), - Error::Recv(e) => e.into(), + Error::MpscRecv(e) => e.into(), + Error::OneshotRecv(e) => e.into(), #[cfg(feature = "rpc")] Error::Write(e) => e.into(), } @@ -1784,7 +1818,7 @@ pub mod rpc { channel::{ mpsc::{self, DynReceiver, DynSender}, none::NoSender, - oneshot, RecvError, SendError, + oneshot, SendError, }, util::{now_or_never, AsyncReadVarintExt, WriteVarintExt}, LocalSender, RequestError, RpcMessage, Service, @@ -1981,26 +2015,23 @@ pub mod rpc { impl From for oneshot::Receiver { fn from(mut read: quinn::RecvStream) -> Self { - let fut = async move { - let size = read - .read_varint_u64() - .await? - .ok_or(RecvError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to read size", - )))?; - if size > MAX_MESSAGE_SIZE { - read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); - return Err(RecvError::MaxMessageSizeExceeded); - } - let rest = read - .read_to_end(size as usize) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let msg: T = postcard::from_bytes(&rest) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(msg) - }; + let fut = + async move { + let size = read.read_varint_u64().await?.ok_or(oneshot::RecvError::Io( + io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"), + ))?; + if size > MAX_MESSAGE_SIZE { + read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); + return Err(oneshot::RecvError::MaxMessageSizeExceeded); + } + let rest = read + .read_to_end(size as usize) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let msg: T = postcard::from_bytes(&rest) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(msg) + }; oneshot::Receiver::from(|| fut) } } @@ -2088,7 +2119,12 @@ pub mod rpc { fn recv( &mut self, ) -> Pin< - Box, RecvError>> + Send + Sync + '_>, + Box< + dyn Future, mpsc::RecvError>> + + Send + + Sync + + '_, + >, > { Box::pin(async { let read = &mut self.recv; @@ -2099,7 +2135,7 @@ pub mod rpc { self.recv .stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) .ok(); - return Err(RecvError::MaxMessageSizeExceeded); + return Err(mpsc::RecvError::MaxMessageSizeExceeded); } let mut buf = vec![0; size as usize]; read.read_exact(&mut buf) @@ -2387,7 +2423,7 @@ pub mod rpc { ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(), b"request exceeded max message size", ); - return Err(RecvError::MaxMessageSizeExceeded.into()); + return Err(mpsc::RecvError::MaxMessageSizeExceeded.into()); } let mut buf = vec![0; size as usize]; recv.read_exact(&mut buf) diff --git a/tests/mpsc_channel.rs b/tests/mpsc_channel.rs index d3982b6..4717fd1 100644 --- a/tests/mpsc_channel.rs +++ b/tests/mpsc_channel.rs @@ -6,7 +6,10 @@ use std::{ }; use irpc::{ - channel::{mpsc, RecvError, SendError}, + channel::{ + mpsc::{self, Receiver, RecvError}, + SendError, + }, util::AsyncWriteVarintExt, }; use quinn::Endpoint; @@ -122,7 +125,7 @@ async fn vec_receiver(server: Endpoint) -> Result<(), RecvError> { .accept_bi() .await .map_err(|e| RecvError::Io(e.into()))?; - let mut recv = mpsc::Receiver::>::from(recv); + let mut recv = Receiver::>::from(recv); while recv.recv().await?.is_some() {} Err(RecvError::Io(io::ErrorKind::UnexpectedEof.into())) } @@ -145,7 +148,7 @@ async fn mpsc_max_message_size_send() -> TestResult<()> { let Err(cause) = server.await? else { panic!("server should have failed due to max message size"); }; - assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset)); + assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset)); Ok(()) } @@ -165,24 +168,24 @@ async fn mpsc_max_message_size_recv() -> TestResult<()> { let Err(cause) = server.await? else { panic!("server should have failed due to max message size"); }; - assert!(matches!(cause, RecvError::MaxMessageSizeExceeded)); + assert!(matches!(cause, mpsc::RecvError::MaxMessageSizeExceeded)); Ok(()) } -async fn noser_receiver(server: Endpoint) -> Result<(), RecvError> { +async fn noser_receiver(server: Endpoint) -> Result<(), mpsc::RecvError> { let conn = server .accept() .await .unwrap() .await - .map_err(|e| RecvError::Io(e.into()))?; + .map_err(|e| mpsc::RecvError::Io(e.into()))?; let (_, recv) = conn .accept_bi() .await - .map_err(|e| RecvError::Io(e.into()))?; + .map_err(|e| mpsc::RecvError::Io(e.into()))?; let mut recv = mpsc::Receiver::::from(recv); while recv.recv().await?.is_some() {} - Err(RecvError::Io(io::ErrorKind::UnexpectedEof.into())) + Err(mpsc::RecvError::Io(io::ErrorKind::UnexpectedEof.into())) } /// Checks that a serialization error is caught and propagated to the receiver. @@ -203,7 +206,7 @@ async fn mpsc_serialize_error_send() -> TestResult<()> { let Err(cause) = server.await? else { panic!("server should have failed due to serialization error"); }; - assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset)); + assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset)); Ok(()) } @@ -220,6 +223,6 @@ async fn mpsc_serialize_error_recv() -> TestResult<()> { let Err(cause) = server.await? else { panic!("server should have failed due to serialization error"); }; - assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::InvalidData)); + assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::InvalidData)); Ok(()) } diff --git a/tests/oneshot_channel.rs b/tests/oneshot_channel.rs index 922edbc..72202e9 100644 --- a/tests/oneshot_channel.rs +++ b/tests/oneshot_channel.rs @@ -3,7 +3,10 @@ use std::io::{self, ErrorKind}; use irpc::{ - channel::{oneshot, RecvError, SendError}, + channel::{ + oneshot::{self, RecvError}, + SendError, + }, util::AsyncWriteVarintExt, }; use quinn::Endpoint; From 6dd6f12cc8fd232aa929b614ac8813a5de176a69 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 8 Oct 2025 14:00:26 +0300 Subject: [PATCH 15/15] PR review, remove useless conversion --- src/lib.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c4a6dcf..781b80f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2015,23 +2015,23 @@ pub mod rpc { impl From for oneshot::Receiver { fn from(mut read: quinn::RecvStream) -> Self { - let fut = - async move { - let size = read.read_varint_u64().await?.ok_or(oneshot::RecvError::Io( - io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"), - ))?; - if size > MAX_MESSAGE_SIZE { - read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); - return Err(oneshot::RecvError::MaxMessageSizeExceeded); - } - let rest = read - .read_to_end(size as usize) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let msg: T = postcard::from_bytes(&rest) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(msg) - }; + let fut = async move { + let size = read.read_varint_u64().await?.ok_or(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read size", + ))?; + if size > MAX_MESSAGE_SIZE { + read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); + return Err(oneshot::RecvError::MaxMessageSizeExceeded); + } + let rest = read + .read_to_end(size as usize) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let msg: T = postcard::from_bytes(&rest) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(msg) + }; oneshot::Receiver::from(|| fut) } }