From 11688656a8198120124137856d26c030fbaffe2b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 16 Apr 2025 19:49:26 +0300 Subject: [PATCH] make the futures returned from the syntax sugar functions self-contained --- src/lib.rs | 192 +++++++++++++++++++++++++++++------------------------ 1 file changed, 107 insertions(+), 85 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1fb65b8..fb3000e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -824,73 +824,84 @@ impl Client { } /// Performs a request for which the server returns a oneshot receiver. - pub async fn rpc(&self, msg: Req) -> Result + pub fn rpc( + &self, + msg: Req, + ) -> impl Future> + Send + 'static where S: Service, M: From> + Send + Sync + Unpin + 'static, - R: From + Serialize + 'static, - Req: Channels, Rx = NoReceiver>, + R: From + Serialize + Send + Sync + 'static, + Req: Channels, Rx = NoReceiver> + Send + 'static, Res: RpcMessage, { - let recv: channel::oneshot::Receiver = match self.request().await? { - Request::Local(request) => { - let (tx, rx) = channel::oneshot::channel(); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - let res = recv.await?; - Ok(res) + let request = self.request(); + async move { + let recv: channel::oneshot::Receiver = match request.await? { + Request::Local(request) => { + let (tx, rx) = channel::oneshot::channel(); + request.send((msg, tx)).await?; + rx + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (_tx, rx) = request.write(msg).await?; + rx.into() + } + }; + let res = recv.await?; + Ok(res) + } } /// Performs a request for which the server returns a spsc receiver. - pub async fn server_streaming( + pub fn server_streaming( &self, msg: Req, local_response_cap: usize, - ) -> Result, Error> + ) -> impl Future, Error>> + Send + 'static where S: Service, M: From> + Send + Sync + Unpin + 'static, - R: From + Serialize + 'static, - Req: Channels, Rx = NoReceiver>, + R: From + Serialize + Send + Sync + 'static, + Req: Channels, Rx = NoReceiver> + Send + 'static, Res: RpcMessage, { - let recv: channel::spsc::Receiver = match self.request().await? { - Request::Local(request) => { - let (tx, rx) = channel::spsc::channel(local_response_cap); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - Ok(recv) + let request = self.request(); + async move { + let recv: channel::spsc::Receiver = match request.await? { + Request::Local(request) => { + let (tx, rx) = channel::spsc::channel(local_response_cap); + request.send((msg, tx)).await?; + rx + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (_tx, rx) = request.write(msg).await?; + rx.into() + } + }; + Ok(recv) + } } /// Performs a request for which the client can send updates. - pub async fn client_streaming( + pub fn client_streaming( &self, msg: Req, local_update_cap: usize, - ) -> Result< - ( - channel::spsc::Sender, - channel::oneshot::Receiver, - ), - Error, + ) -> impl Future< + Output = Result< + ( + channel::spsc::Sender, + channel::oneshot::Receiver, + ), + Error, + >, > where S: Service, @@ -900,59 +911,70 @@ impl Client { Update: RpcMessage, Res: RpcMessage, { - let (update_tx, res_rx): ( - channel::spsc::Sender, - channel::oneshot::Receiver, - ) = match self.request().await? { - Request::Local(request) => { - let (req_tx, req_rx) = channel::spsc::channel(local_update_cap); - let (res_tx, res_rx) = channel::oneshot::channel(); - request.send((msg, res_tx, req_rx)).await?; - (req_tx, res_rx) - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (tx, rx) = request.write(msg).await?; - (tx.into(), rx.into()) - } - }; - Ok((update_tx, res_rx)) + let request = self.request(); + async move { + let (update_tx, res_rx): ( + channel::spsc::Sender, + channel::oneshot::Receiver, + ) = match request.await? { + Request::Local(request) => { + let (req_tx, req_rx) = channel::spsc::channel(local_update_cap); + let (res_tx, res_rx) = channel::oneshot::channel(); + request.send((msg, res_tx, req_rx)).await?; + (req_tx, res_rx) + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (tx, rx) = request.write(msg).await?; + (tx.into(), rx.into()) + } + }; + Ok((update_tx, res_rx)) + } } /// Performs a request for which the client can send updates, and the server returns a spsc receiver. - pub async fn bidi_streaming( + pub fn bidi_streaming( &self, msg: Req, local_update_cap: usize, local_response_cap: usize, - ) -> Result<(channel::spsc::Sender, channel::spsc::Receiver), Error> + ) -> impl Future< + Output = Result<(channel::spsc::Sender, channel::spsc::Receiver), Error>, + > + Send + + 'static where S: Service, M: From> + Send + Sync + Unpin + 'static, - R: From + Serialize + 'static, - Req: Channels, Rx = channel::spsc::Receiver>, + R: From + Serialize + Send + 'static, + Req: Channels, Rx = channel::spsc::Receiver> + + Send + + 'static, Update: RpcMessage, Res: RpcMessage, { - let (update_tx, res_rx): (channel::spsc::Sender, channel::spsc::Receiver) = - match self.request().await? { - Request::Local(request) => { - let (update_tx, update_rx) = channel::spsc::channel(local_update_cap); - let (res_tx, res_rx) = channel::spsc::channel(local_response_cap); - request.send((msg, res_tx, update_rx)).await?; - (update_tx, res_rx) - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (tx, rx) = request.write(msg).await?; - (tx.into(), rx.into()) - } - }; - Ok((update_tx, res_rx)) + let request = self.request(); + async move { + let (update_tx, res_rx): (channel::spsc::Sender, channel::spsc::Receiver) = + match request.await? { + Request::Local(request) => { + let (update_tx, update_rx) = channel::spsc::channel(local_update_cap); + let (res_tx, res_rx) = channel::spsc::channel(local_response_cap); + request.send((msg, res_tx, update_rx)).await?; + (update_tx, res_rx) + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (tx, rx) = request.write(msg).await?; + (tx.into(), rx.into()) + } + }; + Ok((update_tx, res_rx)) + } } }