From e1a30bccffff6bff275663f0ec29c9db4c238789 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 23 Dec 2022 10:44:18 +0100 Subject: [PATCH] Make `client::request` return an `into_future` based builder --- async-nats/src/client.rs | 152 +++++++++++++++-------------- async-nats/src/jetstream/stream.rs | 4 + async-nats/tests/client_tests.rs | 15 ++- 3 files changed, 94 insertions(+), 77 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 4e7059ab2..db67c85fb 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -29,7 +29,6 @@ use std::sync::Arc; use std::time::Duration; use tokio::io::{self, ErrorKind}; use tokio::sync::mpsc; -use tracing::trace; lazy_static! { static ref VERSION_RE: Regex = Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap(); @@ -310,10 +309,8 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub async fn request(&self, subject: String, payload: Bytes) -> Result { - trace!("request sent to subject: {} ({})", subject, payload.len()); - let request = Request::new().payload(payload); - self.send_request(subject, request).await + pub fn request(&self, subject: String, payload: Bytes) -> Request { + Request::new(self.clone(), subject, payload) } /// Sends the request with headers. @@ -336,59 +333,11 @@ impl Client { headers: HeaderMap, payload: Bytes, ) -> Result { - let request = Request::new().headers(headers).payload(payload); - self.send_request(subject, request).await - } + let message = Request::new(self.clone(), subject, payload) + .headers(headers) + .await?; - /// Sends the request created by the [Request]. - /// - /// # Examples - /// - /// ```no_run - /// # #[tokio::main] - /// # async fn main() -> Result<(), async_nats::Error> { - /// let client = async_nats::connect("demo.nats.io").await?; - /// let request = async_nats::Request::new().payload("data".into()); - /// let response = client.send_request("service".into(), request).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn send_request(&self, subject: String, request: Request) -> Result { - let inbox = request.inbox.unwrap_or_else(|| self.new_inbox()); - let timeout = request.timeout.unwrap_or(self.request_timeout); - let mut sub = self.subscribe(inbox.clone()).await?; - let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); - match request.headers { - Some(headers) => { - self.publish_with_reply_and_headers(subject, inbox, headers, payload) - .await? - } - None => self.publish_with_reply(subject, inbox, payload).await?, - } - self.flush().await?; - let request = match timeout { - Some(timeout) => { - tokio::time::timeout(timeout, sub.next()) - .map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out")) - .await? - } - None => sub.next().await, - }; - match request { - Some(message) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Err(Box::new(std::io::Error::new( - ErrorKind::NotFound, - "nats: no responders", - ))); - } - Ok(message) - } - None => Err(Box::new(io::Error::new( - ErrorKind::BrokenPipe, - "did not receive any message", - ))), - } + Ok(message) } /// Create a new globally unique inbox which can be used for replies. @@ -512,9 +461,11 @@ impl Client { } } -/// Used for building customized requests. -#[derive(Default)] +/// Used for building and sending requests. +#[derive(Debug)] pub struct Request { + client: Client, + subject: String, payload: Option, headers: Option, timeout: Option>, @@ -522,8 +473,15 @@ pub struct Request { } impl Request { - pub fn new() -> Request { - Default::default() + pub fn new(client: Client, subject: String, payload: Bytes) -> Request { + Request { + client, + subject, + payload: Some(payload), + headers: None, + timeout: None, + inbox: None, + } } /// Sets the payload of the request. If not used, empty payload will be sent. @@ -533,8 +491,7 @@ impl Request { /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = async_nats::connect("demo.nats.io").await?; - /// let request = async_nats::Request::new().payload("data".into()); - /// client.send_request("service".into(), request).await?; + /// client.request("subject".into(), "data".into()).await?; /// # Ok(()) /// # } /// ``` @@ -553,10 +510,11 @@ impl Request { /// let client = async_nats::connect("demo.nats.io").await?; /// let mut headers = async_nats::HeaderMap::new(); /// headers.insert("X-Example", async_nats::HeaderValue::from_str("Value").unwrap()); - /// let request = async_nats::Request::new() + /// + /// client.request("subject".into(), "payload".into()) /// .headers(headers) - /// .payload("data".into()); - /// client.send_request("service".into(), request).await?; + /// .await?; + /// /// # Ok(()) /// # } /// ``` @@ -574,10 +532,10 @@ impl Request { /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = async_nats::connect("demo.nats.io").await?; - /// let request = async_nats::Request::new() + /// client.request("service".into(), "data".into()) /// .timeout(Some(std::time::Duration::from_secs(15))) - /// .payload("data".into()); - /// client.send_request("service".into(), request).await?; + /// .await?; + /// /// # Ok(()) /// # } /// ``` @@ -594,10 +552,9 @@ impl Request { /// # async fn main() -> Result<(), async_nats::Error> { /// use std::str::FromStr; /// let client = async_nats::connect("demo.nats.io").await?; - /// let request = async_nats::Request::new() + /// client.request("subject".into(), "payload".into()) /// .inbox("custom_inbox".into()) - /// .payload("data".into()); - /// client.send_request("service".into(), request).await?; + /// .await?; /// # Ok(()) /// # } /// ``` @@ -605,4 +562,55 @@ impl Request { self.inbox = Some(inbox); self } + + async fn send(self) -> Result { + let inbox = self.inbox.unwrap_or_else(|| self.client.new_inbox()); + let mut subscriber = self.client.subscribe(inbox.clone()).await?; + let mut publish = self + .client + .publish(self.subject, self.payload.unwrap_or_else(Bytes::new)); + if let Some(headers) = self.headers { + publish = publish.headers(headers); + } + + publish = publish.reply(inbox); + publish.into_future().await?; + + self.client.flush().await?; + + let period = self.timeout.unwrap_or(self.client.request_timeout); + let message = match period { + Some(period) => { + tokio::time::timeout(period, subscriber.next()) + .map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out")) + .await? + } + None => subscriber.next().await, + }; + + match message { + Some(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(Box::new(std::io::Error::new( + ErrorKind::NotFound, + "nats: no responders", + ))); + } + Ok(message) + } + None => Err(Box::new(io::Error::new( + ErrorKind::BrokenPipe, + "did not receive any message", + ))), + } + } +} + +impl IntoFuture for Request { + type Output = Result; + type IntoFuture = Pin> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(self.send()) + } } diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 6109679d2..af1955513 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -167,6 +167,7 @@ impl Stream { message, context: self.context.clone(), })?; + if let Some(status) = response.status { if let Some(ref description) = response.description { return Err(Box::from(std::io::Error::new( @@ -226,11 +227,13 @@ impl Stream { request_subject, serde_json::to_vec(&payload).map(Bytes::from)?, ) + .into_future() .await .map(|message| Message { message, context: self.context.clone(), })?; + if let Some(status) = response.status { if let Some(ref description) = response.description { return Err(Box::from(std::io::Error::new( @@ -284,6 +287,7 @@ impl Stream { .context .client .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?) + .into_future() .await .map(|message| Message { context: self.context.clone(), diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index f17a56ca2..005df959c 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -14,10 +14,11 @@ mod client { use async_nats::connection::State; use async_nats::header::HeaderValue; - use async_nats::{ConnectOptions, Event, Request}; + use async_nats::{ConnectOptions, Event}; use bytes::Bytes; use futures::future::join_all; use futures::stream::StreamExt; + use std::future::IntoFuture; use std::io::ErrorKind; use std::str::FromStr; use std::time::Duration; @@ -238,7 +239,9 @@ mod client { let resp = tokio::time::timeout( tokio::time::Duration::from_millis(500), - client.request("test".into(), "request".into()), + client + .request("test".into(), "request".into()) + .into_future(), ) .await .unwrap(); @@ -271,7 +274,9 @@ mod client { tokio::time::timeout( tokio::time::Duration::from_millis(300), - client.request("test".into(), "request".into()), + client + .request("test".into(), "request".into()) + .into_future(), ) .await .unwrap() @@ -298,9 +303,9 @@ mod client { } }); - let request = Request::new().inbox(inbox.clone()); client - .send_request("service".into(), request) + .request("service".into(), "".into()) + .inbox(inbox) .await .unwrap(); }