diff --git a/Cargo.lock b/Cargo.lock index 1e36f78..2fdb4e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,6 +1686,7 @@ dependencies = [ "anyhow", "derive_more 2.0.1", "futures-buffered", + "futures-util", "iroh-quinn", "irpc-derive", "n0-future", diff --git a/Cargo.toml b/Cargo.toml index 625923a..3474193 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ anyhow = { workspace = true, optional = true } futures-buffered ={ version = "0.2.9", optional = true } # for AbortOnDropHandle n0-future = { workspace = true, optional = true } +futures-util = { workspace = true, optional = true } [target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] quinn = { workspace = true, optional = true, features = ["runtime-tokio"] } @@ -64,7 +65,8 @@ rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing", quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:anyhow", "dep:futures-buffered", "quinn/rustls-ring"] # pick up parent span when creating channel messages message_spans = [] -default = ["rpc", "quinn_endpoint_setup", "message_spans"] +stream = ["dep:futures-util"] +default = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"] [workspace] members = ["irpc-derive", "irpc-iroh"] @@ -86,4 +88,4 @@ n0-future = { version = "0.1.2", default-features = false } tracing-subscriber = { version = "0.3.19" } iroh = { version = "0.34" } quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false } - +futures-util = { version = "0.3", features = ["sink"] } diff --git a/irpc-derive/src/lib.rs b/irpc-derive/src/lib.rs index 08264b5..754889e 100644 --- a/irpc-derive/src/lib.rs +++ b/irpc-derive/src/lib.rs @@ -301,6 +301,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream { .iter() .map(|(variant_name, inner_type)| { quote! { + #[allow(missing_docs)] #variant_name(::irpc::WithChannels<#inner_type, #service_name>) } }) @@ -311,6 +312,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream { // Create the message enum definition let message_enum = quote! { + #[allow(missing_docs)] #[derive(Debug)] pub enum #message_enum_name { #(#message_variants),* diff --git a/src/lib.rs b/src/lib.rs index 11b948e..1fb65b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -362,6 +362,17 @@ pub mod channel { Sender::Boxed(x) => x.is_rpc(), } } + + #[cfg(feature = "stream")] + pub fn into_sink(self) -> impl n0_future::Sink + Send + 'static + where + T: RpcMessage, + { + futures_util::sink::unfold(self, |mut sink, value| async move { + sink.send(value).await?; + Ok(sink) + }) + } } impl From> for Sender { @@ -489,6 +500,16 @@ pub mod channel { Self::Boxed(rx) => Ok(rx.recv().await?), } } + + #[cfg(feature = "stream")] + pub fn into_stream( + self, + ) -> impl n0_future::Stream> + Send + 'static + { + n0_future::stream::unfold(self, |mut recv| async move { + recv.recv().await.transpose().map(|msg| (msg, recv)) + }) + } } impl From> for Receiver {