diff --git a/irpc-iroh/examples/0rtt.rs b/irpc-iroh/examples/0rtt.rs index 823311f..985a885 100644 --- a/irpc-iroh/examples/0rtt.rs +++ b/irpc-iroh/examples/0rtt.rs @@ -188,12 +188,9 @@ mod cli { mod ping { use anyhow::{Context, Result}; use futures_util::FutureExt; - use iroh::{ - endpoint::{Connection, RecvStream, SendStream}, - Endpoint, - }; + use iroh::Endpoint; use irpc::{channel::oneshot, rpc::RemoteService, rpc_requests, Client, WithChannels}; - use irpc_iroh::{Iroh0RttProtocol, IrohProtocol}; + use irpc_iroh::{Iroh0RttProtocol, IrohProtocol, IrohRemoteConnection}; use n0_future::future; use serde::{Deserialize, Serialize}; use tracing::info; @@ -250,7 +247,7 @@ mod ping { .context("failed to connect to remote service")?; let fut: future::Boxed = Box::pin(async { true }); Ok(EchoApi { - inner: Client::boxed(IrohConnection(conn)), + inner: Client::boxed(IrohRemoteConnection::new(conn)), zero_rtt_accepted: fut.shared(), }) } @@ -268,7 +265,7 @@ mod ping { info!("0-RTT possible from our side"); let fut: future::Boxed = Box::pin(zero_rtt_accepted); Ok(EchoApi { - inner: Client::boxed(IrohConnection(conn)), + inner: Client::boxed(IrohRemoteConnection::new(conn)), zero_rtt_accepted: fut.shared(), }) } @@ -277,7 +274,7 @@ mod ping { let fut: future::Boxed = Box::pin(async { true }); let conn = connecting.await?; Ok(EchoApi { - inner: Client::boxed(IrohConnection(conn)), + inner: Client::boxed(IrohRemoteConnection::new(conn)), zero_rtt_accepted: fut.shared(), }) } @@ -321,25 +318,4 @@ mod ping { } } } - - #[derive(Debug, Clone)] - struct IrohConnection(Connection); - - impl irpc::rpc::RemoteConnection for IrohConnection { - fn clone_boxed(&self) -> Box { - Box::new(self.clone()) - } - - fn open_bi( - &self, - ) -> n0_future::future::Boxed< - std::result::Result<(SendStream, RecvStream), irpc::RequestError>, - > { - let conn = self.0.clone(); - Box::pin(async move { - let (send, recv) = conn.open_bi().await?; - Ok((send, recv)) - }) - } - } } diff --git a/irpc-iroh/examples/auth.rs b/irpc-iroh/examples/auth.rs index f37e860..13f1af1 100644 --- a/irpc-iroh/examples/auth.rs +++ b/irpc-iroh/examples/auth.rs @@ -76,7 +76,7 @@ mod storage { rpc_requests, Client, WithChannels, }; // Import the macro - use irpc_iroh::{read_request, IrohRemoteConnection}; + use irpc_iroh::{read_request, IrohLazyRemoteConnection}; use serde::{Deserialize, Serialize}; use tracing::info; @@ -225,7 +225,7 @@ mod storage { pub const ALPN: &[u8] = ALPN; pub fn connect(endpoint: Endpoint, addr: impl Into) -> StorageClient { - let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); + let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); StorageClient { inner: Client::boxed(conn), } diff --git a/irpc-iroh/examples/derive.rs b/irpc-iroh/examples/derive.rs index 2b6f78f..9ae06fc 100644 --- a/irpc-iroh/examples/derive.rs +++ b/irpc-iroh/examples/derive.rs @@ -65,7 +65,7 @@ mod storage { rpc_requests, Client, WithChannels, }; // Import the macro - use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; + use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol}; use serde::{Deserialize, Serialize}; use tracing::info; @@ -161,7 +161,7 @@ mod storage { endpoint: Endpoint, addr: impl Into, ) -> Result { - let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); + let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); Ok(StorageApi { inner: Client::boxed(conn), }) diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index e8fd3ab..6971a1b 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -26,15 +26,48 @@ pub fn client( addr: impl Into, alpn: impl AsRef<[u8]>, ) -> irpc::Client { - let conn = IrohRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec()); + let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec()); irpc::Client::boxed(conn) } + +/// Wrap an existing iroh connection as an irpc remote connection. +/// +/// This will stop working as soon as the underlying iroh connection is closed. +/// If you need to support reconnects, use [`IrohLazyRemoteConnection`] instead. +// TODO: remove this and provide a From instance as soon as iroh is 1.0 and +// we can move irpc-iroh into irpc? +#[derive(Debug, Clone)] +pub struct IrohRemoteConnection(Connection); + +impl IrohRemoteConnection { + pub fn new(connection: Connection) -> Self { + Self(connection) + } +} + +impl irpc::rpc::RemoteConnection for IrohRemoteConnection { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn open_bi( + &self, + ) -> n0_future::future::Boxed> + { + let conn = self.0.clone(); + Box::pin(async move { + let (send, recv) = conn.open_bi().await?; + Ok((send, recv)) + }) + } +} + /// A connection to a remote service. /// /// Initially this does just have the endpoint and the address. Once a /// connection is established, it will be stored. #[derive(Debug, Clone)] -pub struct IrohRemoteConnection(Arc); +pub struct IrohLazyRemoteConnection(Arc); #[derive(Debug)] struct IrohRemoteConnectionInner { @@ -44,7 +77,7 @@ struct IrohRemoteConnectionInner { alpn: Vec, } -impl IrohRemoteConnection { +impl IrohLazyRemoteConnection { pub fn new(endpoint: iroh::Endpoint, addr: iroh::EndpointAddr, alpn: Vec) -> Self { Self(Arc::new(IrohRemoteConnectionInner { endpoint, @@ -55,7 +88,7 @@ impl IrohRemoteConnection { } } -impl RemoteConnection for IrohRemoteConnection { +impl RemoteConnection for IrohLazyRemoteConnection { fn clone_boxed(&self) -> Box { Box::new(self.clone()) } diff --git a/src/lib.rs b/src/lib.rs index 06ba70f..1da5f2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1285,7 +1285,7 @@ impl Client { /// and a socket `addr` of the remote service. #[cfg(feature = "rpc")] pub fn quinn(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self { - Self::boxed(rpc::QuinnRemoteConnection::new(endpoint, addr)) + Self::boxed(rpc::QuinnLazyRemoteConnection::new(endpoint, addr)) } /// Create a new client from a `rpc::RemoteConnection` trait object. @@ -1899,18 +1899,35 @@ pub mod rpc { /// Initially this does just have the endpoint and the address. Once a /// connection is established, it will be stored. #[derive(Debug, Clone)] - pub(crate) struct QuinnRemoteConnection(Arc); + pub(crate) struct QuinnLazyRemoteConnection(Arc); #[derive(Debug)] - struct QuinnRemoteConnectionInner { + struct QuinnLazyRemoteConnectionInner { pub endpoint: quinn::Endpoint, pub addr: std::net::SocketAddr, pub connection: tokio::sync::Mutex>, } - impl QuinnRemoteConnection { + impl RemoteConnection for quinn::Connection { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn open_bi( + &self, + ) -> BoxFuture> + { + let conn = self.clone(); + Box::pin(async move { + let pair = conn.open_bi().await?; + Ok(pair) + }) + } + } + + impl QuinnLazyRemoteConnection { pub fn new(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self { - Self(Arc::new(QuinnRemoteConnectionInner { + Self(Arc::new(QuinnLazyRemoteConnectionInner { endpoint, addr, connection: Default::default(), @@ -1918,7 +1935,7 @@ pub mod rpc { } } - impl RemoteConnection for QuinnRemoteConnection { + impl RemoteConnection for QuinnLazyRemoteConnection { fn clone_boxed(&self) -> Box { Box::new(self.clone()) }