Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 5 additions & 29 deletions irpc-iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,9 @@ mod cli {
mod ping {
use anyhow::{Context, Result};
use futures_util::FutureExt;
use iroh::{
endpoint::{Connection, RecvStream, SendStream},
Endpoint,
};
use iroh::Endpoint;
use irpc::{channel::oneshot, rpc::RemoteService, rpc_requests, Client, WithChannels};
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol};
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol, IrohRemoteConnection};
use n0_future::future;
use serde::{Deserialize, Serialize};
use tracing::info;
Expand Down Expand Up @@ -250,7 +247,7 @@ mod ping {
.context("failed to connect to remote service")?;
let fut: future::Boxed<bool> = Box::pin(async { true });
Ok(EchoApi {
inner: Client::boxed(IrohConnection(conn)),
inner: Client::boxed(IrohRemoteConnection::new(conn)),
zero_rtt_accepted: fut.shared(),
})
}
Expand All @@ -268,7 +265,7 @@ mod ping {
info!("0-RTT possible from our side");
let fut: future::Boxed<bool> = Box::pin(zero_rtt_accepted);
Ok(EchoApi {
inner: Client::boxed(IrohConnection(conn)),
inner: Client::boxed(IrohRemoteConnection::new(conn)),
zero_rtt_accepted: fut.shared(),
})
}
Expand All @@ -277,7 +274,7 @@ mod ping {
let fut: future::Boxed<bool> = Box::pin(async { true });
let conn = connecting.await?;
Ok(EchoApi {
inner: Client::boxed(IrohConnection(conn)),
inner: Client::boxed(IrohRemoteConnection::new(conn)),
zero_rtt_accepted: fut.shared(),
})
}
Expand Down Expand Up @@ -321,25 +318,4 @@ mod ping {
}
}
}

#[derive(Debug, Clone)]
struct IrohConnection(Connection);

impl irpc::rpc::RemoteConnection for IrohConnection {
fn clone_boxed(&self) -> Box<dyn irpc::rpc::RemoteConnection> {
Box::new(self.clone())
}

fn open_bi(
&self,
) -> n0_future::future::Boxed<
std::result::Result<(SendStream, RecvStream), irpc::RequestError>,
> {
let conn = self.0.clone();
Box::pin(async move {
let (send, recv) = conn.open_bi().await?;
Ok((send, recv))
})
}
}
}
4 changes: 2 additions & 2 deletions irpc-iroh/examples/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ mod storage {
rpc_requests, Client, WithChannels,
};
// Import the macro
use irpc_iroh::{read_request, IrohRemoteConnection};
use irpc_iroh::{read_request, IrohLazyRemoteConnection};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -225,7 +225,7 @@ mod storage {
pub const ALPN: &[u8] = ALPN;

pub fn connect(endpoint: Endpoint, addr: impl Into<iroh::EndpointAddr>) -> StorageClient {
let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
StorageClient {
inner: Client::boxed(conn),
}
Expand Down
4 changes: 2 additions & 2 deletions irpc-iroh/examples/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod storage {
rpc_requests, Client, WithChannels,
};
// Import the macro
use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -161,7 +161,7 @@ mod storage {
endpoint: Endpoint,
addr: impl Into<iroh::EndpointAddr>,
) -> Result<StorageApi> {
let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
Ok(StorageApi {
inner: Client::boxed(conn),
})
Expand Down
41 changes: 37 additions & 4 deletions irpc-iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,48 @@ pub fn client<S: irpc::Service>(
addr: impl Into<iroh::EndpointAddr>,
alpn: impl AsRef<[u8]>,
) -> irpc::Client<S> {
let conn = IrohRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec());
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec());
irpc::Client::boxed(conn)
}

/// Wrap an existing iroh connection as an irpc remote connection.
///
/// This will stop working as soon as the underlying iroh connection is closed.
/// If you need to support reconnects, use [`IrohLazyRemoteConnection`] instead.
// TODO: remove this and provide a From instance as soon as iroh is 1.0 and
// we can move irpc-iroh into irpc?
#[derive(Debug, Clone)]
pub struct IrohRemoteConnection(Connection);

impl IrohRemoteConnection {
pub fn new(connection: Connection) -> Self {
Self(connection)
}
}

impl irpc::rpc::RemoteConnection for IrohRemoteConnection {
fn clone_boxed(&self) -> Box<dyn irpc::rpc::RemoteConnection> {
Box::new(self.clone())
}

fn open_bi(
&self,
) -> n0_future::future::Boxed<std::result::Result<(SendStream, RecvStream), irpc::RequestError>>
{
let conn = self.0.clone();
Box::pin(async move {
let (send, recv) = conn.open_bi().await?;
Ok((send, recv))
})
}
}

/// A connection to a remote service.
///
/// Initially this does just have the endpoint and the address. Once a
/// connection is established, it will be stored.
#[derive(Debug, Clone)]
pub struct IrohRemoteConnection(Arc<IrohRemoteConnectionInner>);
pub struct IrohLazyRemoteConnection(Arc<IrohRemoteConnectionInner>);

#[derive(Debug)]
struct IrohRemoteConnectionInner {
Expand All @@ -44,7 +77,7 @@ struct IrohRemoteConnectionInner {
alpn: Vec<u8>,
}

impl IrohRemoteConnection {
impl IrohLazyRemoteConnection {
pub fn new(endpoint: iroh::Endpoint, addr: iroh::EndpointAddr, alpn: Vec<u8>) -> Self {
Self(Arc::new(IrohRemoteConnectionInner {
endpoint,
Expand All @@ -55,7 +88,7 @@ impl IrohRemoteConnection {
}
}

impl RemoteConnection for IrohRemoteConnection {
impl RemoteConnection for IrohLazyRemoteConnection {
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
Box::new(self.clone())
}
Expand Down
29 changes: 23 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ impl<S: Service> Client<S> {
/// and a socket `addr` of the remote service.
#[cfg(feature = "rpc")]
pub fn quinn(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
Self::boxed(rpc::QuinnRemoteConnection::new(endpoint, addr))
Self::boxed(rpc::QuinnLazyRemoteConnection::new(endpoint, addr))
}

/// Create a new client from a `rpc::RemoteConnection` trait object.
Expand Down Expand Up @@ -1899,26 +1899,43 @@ pub mod rpc {
/// Initially this does just have the endpoint and the address. Once a
/// connection is established, it will be stored.
#[derive(Debug, Clone)]
pub(crate) struct QuinnRemoteConnection(Arc<QuinnRemoteConnectionInner>);
pub(crate) struct QuinnLazyRemoteConnection(Arc<QuinnLazyRemoteConnectionInner>);

#[derive(Debug)]
struct QuinnRemoteConnectionInner {
struct QuinnLazyRemoteConnectionInner {
pub endpoint: quinn::Endpoint,
pub addr: std::net::SocketAddr,
pub connection: tokio::sync::Mutex<Option<quinn::Connection>>,
}

impl QuinnRemoteConnection {
impl RemoteConnection for quinn::Connection {
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
Box::new(self.clone())
}

fn open_bi(
&self,
) -> BoxFuture<std::result::Result<(quinn::SendStream, quinn::RecvStream), RequestError>>
{
let conn = self.clone();
Box::pin(async move {
let pair = conn.open_bi().await?;
Ok(pair)
})
}
}

impl QuinnLazyRemoteConnection {
pub fn new(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
Self(Arc::new(QuinnRemoteConnectionInner {
Self(Arc::new(QuinnLazyRemoteConnectionInner {
endpoint,
addr,
connection: Default::default(),
}))
}
}

impl RemoteConnection for QuinnRemoteConnection {
impl RemoteConnection for QuinnLazyRemoteConnection {
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
Box::new(self.clone())
}
Expand Down
Loading