Skip to content

Commit

Permalink
Make Context in Publish take a reference instead of it being owned
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Feb 24, 2023
1 parent 20b2c1a commit 6df4657
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Context {
/// # }
/// ```
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.clone(), subject, payload)
Publish::new(self, subject, payload)
}

/// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
Expand Down Expand Up @@ -1011,16 +1011,16 @@ impl futures::Stream for Streams {
}
/// Used for building customized `publish` message.
#[derive(Clone, Debug)]
pub struct Publish {
context: Context,
pub struct Publish<'a> {
context: &'a Context,
subject: String,
payload: Bytes,
headers: Option<header::HeaderMap>,
}

impl Publish {
impl<'a> Publish<'a> {
/// Creates a new custom Publish struct to be used with.
pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self {
pub(crate) fn new(context: &'a Context, subject: String, payload: Bytes) -> Self {
Publish {
context,
subject,
Expand Down Expand Up @@ -1085,26 +1085,23 @@ impl Publish {
}
}

impl IntoFuture for Publish {
impl<'a> IntoFuture for Publish<'a> {
type Output = Result<PublishAckFuture, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAckFuture, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let client = self.context.client.clone();
let timeout = self.context.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let inbox = self.context.client.new_inbox();
let subscription = self.context.client.subscribe(inbox.clone()).await?;
let mut publish = self
.context
.client
.publish(self.subject, self.payload)
.reply(inbox);
let inbox = client.new_inbox();
let subscription = client.subscribe(inbox.clone()).await?;
let mut publish = client.publish(self.subject, self.payload).reply(inbox);

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

let timeout = self.context.timeout;

tokio::time::timeout(timeout, publish.into_future())
.map_err(|_| {
std::io::Error::new(ErrorKind::TimedOut, "JetStream publish request timed out")
Expand Down

0 comments on commit 6df4657

Please sign in to comment.