Skip to content

Commit

Permalink
add unsubscribe and queue group support to TypedNats (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolyatmax committed Nov 15, 2023
1 parent 87da5d2 commit 516dafd
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions core/src/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ const NATS_WRONG_LAST_SEQUENCE_CODE: &str = "10071";
/// consumer state is minimal and we make efficient use of consumers.
const INACTIVE_THRESHOLD_SECONDS: u64 = 60 * 60 * 12; // 12 hours

#[derive(Debug, Clone)]
pub struct QueueGroup(String);
impl QueueGroup {
pub fn new(group: &str) -> Self {
QueueGroup(group.to_string())
}
pub fn to_string(&self) -> String {
self.0.clone()
}
}

/// Unconstructable type, used as a [TypedMessage::Response] to indicate that
/// no response is allowed.
#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -215,6 +226,11 @@ where

None
}

pub async fn unsubscribe(&mut self) -> Result<()> {
self.subscription.unsubscribe().await?;
Ok(())
}
}

impl<T> Stream for TypedSubscription<T>
Expand Down Expand Up @@ -544,4 +560,20 @@ impl TypedNats {
let subscription = self.nc.subscribe(subject.subject).await?;
Ok(TypedSubscription::new(subscription, self.nc.clone()))
}

pub async fn queue_subscribe<T>(
&self,
subject: SubscribeSubject<T>,
queue_group: &QueueGroup,
) -> Result<TypedSubscription<T>>
where
T: TypedMessage,
{
tracing::info!(stream=%subject.subject, queue_group=%queue_group.to_string(), "Subscribing to NATS queue group.");
let subscription = self
.nc
.queue_subscribe(subject.subject, queue_group.to_string())
.await?;
Ok(TypedSubscription::new(subscription, self.nc.clone()))
}
}

0 comments on commit 516dafd

Please sign in to comment.