From 9acf53d5f9e6ae26ced92bbd2f2b5262520726c7 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 15 Apr 2025 15:59:07 +0200 Subject: [PATCH 1/4] feat(irpc-iroh): add iroh ProtocolHandler, restructure example --- irpc-iroh/examples/derive.rs | 368 ++++++++++++++++------------------- irpc-iroh/src/lib.rs | 109 ++++++++--- 2 files changed, 250 insertions(+), 227 deletions(-) diff --git a/irpc-iroh/examples/derive.rs b/irpc-iroh/examples/derive.rs index c29b3f3..213b6a7 100644 --- a/irpc-iroh/examples/derive.rs +++ b/irpc-iroh/examples/derive.rs @@ -1,228 +1,204 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use anyhow::Context; -use irpc::{ - channel::{oneshot, spsc}, - rpc::Handler, - Client, LocalSender, Request, Service, WithChannels, -}; -// Import the macro -use irpc_derive::rpc_requests; -use irpc_iroh::{listen, IrohRemoteConnection}; -use n0_future::task::{self, AbortOnDropHandle}; -use serde::{Deserialize, Serialize}; -use tracing::info; - -/// A simple storage service, just to try it out -#[derive(Debug, Clone, Copy)] -struct StorageService; - -impl Service for StorageService {} - -#[derive(Debug, Serialize, Deserialize)] -struct Get { - key: String, -} +use anyhow::Result; +use iroh::{protocol::Router, Endpoint}; -#[derive(Debug, Serialize, Deserialize)] -struct List; +use self::storage::StorageApi; -#[derive(Debug, Serialize, Deserialize)] -struct Set { - key: String, - value: String, +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt().init(); + println!("Local use"); + local().await?; + println!("Remote use"); + remote().await?; + Ok(()) } -// Use the macro to generate both the StorageProtocol and StorageMessage enums -// plus implement Channels for each type -#[rpc_requests(StorageService, message = StorageMessage)] -#[derive(Serialize, Deserialize)] -enum StorageProtocol { - #[rpc(tx=oneshot::Sender>)] - Get(Get), - #[rpc(tx=oneshot::Sender<()>)] - Set(Set), - #[rpc(tx=spsc::Sender)] - List(List), +async fn local() -> Result<()> { + let api = StorageApi::spawn(); + api.set("hello".to_string(), "world".to_string()).await?; + let value = api.get("hello".to_string()).await?; + let mut list = api.list().await?; + while let Some(value) = list.recv().await? { + println!("list value = {:?}", value); + } + println!("value = {:?}", value); + Ok(()) } -struct StorageActor { - recv: tokio::sync::mpsc::Receiver, - state: BTreeMap, +async fn remote() -> Result<()> { + let (server_router, server_addr) = { + let endpoint = Endpoint::builder().discovery_n0().bind().await?; + let api = StorageApi::spawn(); + let router = Router::builder(endpoint.clone()) + .accept(StorageApi::ALPN.to_vec(), api.expose()?) + .spawn() + .await?; + let addr = endpoint.node_addr().await?; + (router, addr) + }; + + let client_endpoint = Endpoint::builder().bind().await?; + let api = StorageApi::connect(client_endpoint, server_addr)?; + api.set("hello".to_string(), "world".to_string()).await?; + api.set("goodbye".to_string(), "world".to_string()).await?; + let value = api.get("hello".to_string()).await?; + println!("value = {:?}", value); + let mut list = api.list().await?; + while let Some(value) = list.recv().await? { + println!("list value = {:?}", value); + } + drop(server_router); + Ok(()) } -impl StorageActor { - pub fn local() -> StorageApi { - let (tx, rx) = tokio::sync::mpsc::channel(1); - let actor = Self { - recv: rx, - state: BTreeMap::new(), - }; - n0_future::task::spawn(actor.run()); - let local = LocalSender::::from(tx); - StorageApi { - inner: local.into(), - } +mod storage { + //! Implementation of our storage service. + //! + //! The only `pub` item is [`StorageApi`], everything else is private. + + use std::{collections::BTreeMap, sync::Arc}; + + use anyhow::{Context, Result}; + use iroh::{protocol::ProtocolHandler, Endpoint}; + use irpc::{ + channel::{oneshot, spsc}, + rpc::Handler, + Client, LocalSender, Service, WithChannels, + }; + // Import the macro + use irpc_derive::rpc_requests; + use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; + use serde::{Deserialize, Serialize}; + use tracing::info; + /// A simple storage service, just to try it out + #[derive(Debug, Clone, Copy)] + struct StorageService; + + impl Service for StorageService {} + + #[derive(Debug, Serialize, Deserialize)] + struct Get { + key: String, } - async fn run(mut self) { - while let Some(msg) = self.recv.recv().await { - self.handle(msg).await; - } + #[derive(Debug, Serialize, Deserialize)] + struct List; + + #[derive(Debug, Serialize, Deserialize)] + struct Set { + key: String, + value: String, } - async fn handle(&mut self, msg: StorageMessage) { - match msg { - StorageMessage::Get(get) => { - info!("get {:?}", get); - let WithChannels { tx, inner, .. } = get; - tx.send(self.state.get(&inner.key).cloned()).await.ok(); + // Use the macro to generate both the StorageProtocol and StorageMessage enums + // plus implement Channels for each type + #[rpc_requests(StorageService, message = StorageMessage)] + #[derive(Serialize, Deserialize)] + enum StorageProtocol { + #[rpc(tx=oneshot::Sender>)] + Get(Get), + #[rpc(tx=oneshot::Sender<()>)] + Set(Set), + #[rpc(tx=spsc::Sender)] + List(List), + } + + struct StorageActor { + recv: tokio::sync::mpsc::Receiver, + state: BTreeMap, + } + + impl StorageActor { + pub fn spawn() -> StorageApi { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let actor = Self { + recv: rx, + state: BTreeMap::new(), + }; + n0_future::task::spawn(actor.run()); + let local = LocalSender::::from(tx); + StorageApi { + inner: local.into(), } - StorageMessage::Set(set) => { - info!("set {:?}", set); - let WithChannels { tx, inner, .. } = set; - self.state.insert(inner.key, inner.value); - tx.send(()).await.ok(); + } + + async fn run(mut self) { + while let Some(msg) = self.recv.recv().await { + self.handle(msg).await; } - StorageMessage::List(list) => { - info!("list {:?}", list); - let WithChannels { mut tx, .. } = list; - for (key, value) in &self.state { - if tx.send(format!("{key}={value}")).await.is_err() { - break; + } + + async fn handle(&mut self, msg: StorageMessage) { + match msg { + StorageMessage::Get(get) => { + info!("get {:?}", get); + let WithChannels { tx, inner, .. } = get; + tx.send(self.state.get(&inner.key).cloned()).await.ok(); + } + StorageMessage::Set(set) => { + info!("set {:?}", set); + let WithChannels { tx, inner, .. } = set; + self.state.insert(inner.key, inner.value); + tx.send(()).await.ok(); + } + StorageMessage::List(list) => { + info!("list {:?}", list); + let WithChannels { mut tx, .. } = list; + for (key, value) in &self.state { + if tx.send(format!("{key}={value}")).await.is_err() { + break; + } } } } } } -} - -struct StorageApi { - inner: Client, -} -impl StorageApi { - pub fn connect(endpoint: iroh::Endpoint, addr: iroh::NodeAddr) -> anyhow::Result { - Ok(StorageApi { - inner: Client::boxed(IrohRemoteConnection::new( - endpoint, - addr, - b"RPC-Storage".to_vec(), - )), - }) + pub struct StorageApi { + inner: Client, } - pub fn listen(&self, endpoint: iroh::Endpoint) -> anyhow::Result> { - let local = self - .inner - .local() - .context("can not listen on remote service")?; - let handler: Handler = Arc::new(move |msg, _, tx| { - let local = local.clone(); - Box::pin(match msg { - StorageProtocol::Get(msg) => local.send((msg, tx)), - StorageProtocol::Set(msg) => local.send((msg, tx)), - StorageProtocol::List(msg) => local.send((msg, tx)), - }) - }); - Ok(AbortOnDropHandle::new(task::spawn(listen( - endpoint, handler, - )))) - } + impl StorageApi { + pub const ALPN: &[u8] = b"irpc-iroh/derive-demo/0"; - pub async fn get(&self, key: String) -> anyhow::Result>> { - let msg = Get { key }; - match self.inner.request().await? { - Request::Local(request) => { - let (tx, rx) = oneshot::channel(); - request.send((msg, tx)).await?; - Ok(rx) - } - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - Ok(rx.into()) - } + pub fn spawn() -> Self { + StorageActor::spawn() } - } - pub async fn list(&self) -> anyhow::Result> { - let msg = List; - match self.inner.request().await? { - Request::Local(request) => { - let (tx, rx) = spsc::channel(10); - request.send((msg, tx)).await?; - Ok(rx) - } - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - Ok(rx.into()) - } + pub fn connect(endpoint: Endpoint, addr: impl Into) -> Result { + let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); + Ok(StorageApi { + inner: Client::boxed(conn), + }) } - } - pub async fn set(&self, key: String, value: String) -> anyhow::Result> { - let msg = Set { key, value }; - match self.inner.request().await? { - Request::Local(request) => { - let (tx, rx) = oneshot::channel(); - request.send((msg, tx)).await?; - Ok(rx) - } - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - Ok(rx.into()) - } + pub fn expose(&self) -> Result { + let local = self + .inner + .local() + .context("can not listen on remote service")?; + let handler: Handler = Arc::new(move |msg, _rx, tx| { + let local = local.clone(); + Box::pin(match msg { + StorageProtocol::Get(msg) => local.send((msg, tx)), + StorageProtocol::Set(msg) => local.send((msg, tx)), + StorageProtocol::List(msg) => local.send((msg, tx)), + }) + }); + Ok(IrohProtocol::new(handler)) } - } -} -async fn local() -> anyhow::Result<()> { - let api = StorageActor::local(); - api.set("hello".to_string(), "world".to_string()) - .await? - .await?; - let value = api.get("hello".to_string()).await?.await?; - let mut list = api.list().await?; - while let Some(value) = list.recv().await? { - println!("list value = {:?}", value); - } - println!("value = {:?}", value); - Ok(()) -} + pub async fn get(&self, key: String) -> Result, irpc::Error> { + self.inner.rpc(Get { key }).await + } -async fn remote() -> anyhow::Result<()> { - let server = iroh::Endpoint::builder() - .discovery_n0() - .alpns(vec![b"RPC-Storage".to_vec()]) - .bind() - .await?; - let client = iroh::Endpoint::builder().bind().await?; - let addr = server.node_addr().await?; - let store = StorageActor::local(); - let handle = store.listen(server)?; - let api = StorageApi::connect(client, addr)?; - api.set("hello".to_string(), "world".to_string()) - .await? - .await?; - api.set("goodbye".to_string(), "world".to_string()) - .await? - .await?; - let value = api.get("hello".to_string()).await?.await?; - println!("value = {:?}", value); - let mut list = api.list().await?; - while let Some(value) = list.recv().await? { - println!("list value = {:?}", value); - } - drop(handle); - Ok(()) -} + pub async fn list(&self) -> Result, irpc::Error> { + self.inner.server_streaming(List, 10).await + } -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt().init(); - println!("Local use"); - local().await?; - println!("Remote use"); - remote().await?; - Ok(()) + pub async fn set(&self, key: String, value: String) -> Result<(), irpc::Error> { + let msg = Set { key, value }; + self.inner.rpc(msg).await + } + } } diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 77fee69..88223ac 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -1,6 +1,12 @@ -use std::{io, sync::Arc}; +use std::{ + fmt, io, + sync::{atomic::AtomicU64, Arc}, +}; -use iroh::endpoint::{ConnectionError, RecvStream, SendStream}; +use iroh::{ + endpoint::{Connection, ConnectionError, RecvStream, SendStream}, + protocol::ProtocolHandler, +}; use irpc::{ rpc::{Handler, RemoteConnection}, util::AsyncReadVarintExt, @@ -18,7 +24,7 @@ pub struct IrohRemoteConnection(Arc); struct IrohRemoteConnectionInner { endpoint: iroh::Endpoint, addr: iroh::NodeAddr, - connection: tokio::sync::Mutex>, + connection: tokio::sync::Mutex>, alpn: Vec, } @@ -69,7 +75,7 @@ async fn connect_and_open_bi( endpoint: &iroh::Endpoint, addr: &iroh::NodeAddr, alpn: &[u8], - mut guard: tokio::sync::MutexGuard<'_, Option>, + mut guard: tokio::sync::MutexGuard<'_, Option>, ) -> anyhow::Result<(SendStream, RecvStream)> { let conn = endpoint.connect(addr.clone(), alpn).await?; let (send, recv) = conn.open_bi().await?; @@ -89,11 +95,78 @@ mod multithreaded { } #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] use multithreaded::*; +use n0_future::TryFutureExt; use serde::de::DeserializeOwned; use tracing::{trace, trace_span, warn, Instrument}; #[cfg(all(target_family = "wasm", target_os = "unknown"))] use wasm_browser::*; +pub struct IrohProtocol { + handler: Handler, + request_id: AtomicU64, +} + +impl fmt::Debug for IrohProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RpcProtocol") + } +} + +impl IrohProtocol { + pub fn new(handler: Handler) -> Self { + Self { + handler, + request_id: Default::default(), + } + } +} + +impl ProtocolHandler for IrohProtocol { + fn accept(&self, connection: Connection) -> n0_future::future::Boxed> { + let handler = self.handler.clone(); + let request_id = self + .request_id + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + let fut = handle_connection(connection, handler).map_err(anyhow::Error::from); + let span = trace_span!("rpc", id = request_id); + Box::pin(fut.instrument(span)) + } +} + +pub async fn handle_connection( + connection: Connection, + handler: Handler, +) -> io::Result<()> { + loop { + let (send, mut recv) = match connection.accept_bi().await { + Ok((s, r)) => (s, r), + Err(ConnectionError::ApplicationClosed(cause)) + if cause.error_code.into_inner() == 0 => + { + trace!("remote side closed connection {cause:?}"); + return Ok(()); + } + Err(cause) => { + warn!("failed to accept bi stream {cause:?}"); + return Err(cause.into()); + } + }; + let size = recv + .read_varint_u64() + .await? + .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"))?; + let mut buf = vec![0; size as usize]; + recv.read_exact(&mut buf) + .await + .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; + let msg: R = postcard::from_bytes(&buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let rx = recv; + let tx = send; + handler(msg, rx, tx).await?; + } +} + /// Utility function to listen for incoming connections and handle them with the provided handler pub async fn listen(endpoint: iroh::Endpoint, handler: Handler) { let mut request_id = 0u64; @@ -108,33 +181,7 @@ pub async fn listen(endpoint: iroh::Endpoint, han return io::Result::Ok(()); } }; - loop { - let (send, mut recv) = match connection.accept_bi().await { - Ok((s, r)) => (s, r), - Err(ConnectionError::ApplicationClosed(cause)) - if cause.error_code.into_inner() == 0 => - { - trace!("remote side closed connection {cause:?}"); - return Ok(()); - } - Err(cause) => { - warn!("failed to accept bi stream {cause:?}"); - return Err(cause.into()); - } - }; - let size = recv.read_varint_u64().await?.ok_or_else(|| { - io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size") - })?; - let mut buf = vec![0; size as usize]; - recv.read_exact(&mut buf) - .await - .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; - let msg: R = postcard::from_bytes(&buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let rx = recv; - let tx = send; - handler(msg, rx, tx).await?; - } + handle_connection(connection, handler).await }; let span = trace_span!("rpc", id = request_id); tasks.spawn(fut.instrument(span)); From 983db0a2f3df16d262e7a78832c30ce392542f09 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 15 Apr 2025 16:03:40 +0200 Subject: [PATCH 2/4] docs: add docs --- irpc-iroh/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 88223ac..51dd6f0 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -101,6 +101,9 @@ use tracing::{trace, trace_span, warn, Instrument}; #[cfg(all(target_family = "wasm", target_os = "unknown"))] use wasm_browser::*; +/// A [`ProtocolHandler`] for an irpc protocol. +/// +/// Can be added to an [`iroh::router::Router`] to handle incoming connections for an ALPN string. pub struct IrohProtocol { handler: Handler, request_id: AtomicU64, @@ -133,6 +136,7 @@ impl ProtocolHandler for IrohProtocol { } } +/// Handles a single iroh connection with the provided `handler`. pub async fn handle_connection( connection: Connection, handler: Handler, From f9d7c384c5b2da38c5289987d0a70323a386b2dd Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 16 Apr 2025 22:26:10 +0200 Subject: [PATCH 3/4] docs: add --- irpc-iroh/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 51dd6f0..40557ad 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -116,6 +116,7 @@ impl fmt::Debug for IrohProtocol { } impl IrohProtocol { + /// Creates a new [`IrohProtocol`] for the `handler`. pub fn new(handler: Handler) -> Self { Self { handler, From 5b95bbee6e0c9746c8692c3ac43e6f26f4682318 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 16 Apr 2025 22:29:47 +0200 Subject: [PATCH 4/4] chore: clippy --- irpc-iroh/examples/derive.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/irpc-iroh/examples/derive.rs b/irpc-iroh/examples/derive.rs index 213b6a7..3ae5ae6 100644 --- a/irpc-iroh/examples/derive.rs +++ b/irpc-iroh/examples/derive.rs @@ -30,7 +30,7 @@ async fn remote() -> Result<()> { let endpoint = Endpoint::builder().discovery_n0().bind().await?; let api = StorageApi::spawn(); let router = Router::builder(endpoint.clone()) - .accept(StorageApi::ALPN.to_vec(), api.expose()?) + .accept(StorageApi::ALPN, api.expose()?) .spawn() .await?; let addr = endpoint.node_addr().await?;