From 516dafdb4f32910cdd7f30e26fc1c9b4bf6befb1 Mon Sep 17 00:00:00 2001 From: Taylor Baldwin Date: Wed, 15 Nov 2023 15:08:31 -0500 Subject: [PATCH] add unsubscribe and queue group support to TypedNats (#476) --- core/src/nats.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/core/src/nats.rs b/core/src/nats.rs index be963b69..07dc2040 100644 --- a/core/src/nats.rs +++ b/core/src/nats.rs @@ -31,6 +31,17 @@ const NATS_WRONG_LAST_SEQUENCE_CODE: &str = "10071"; /// consumer state is minimal and we make efficient use of consumers. const INACTIVE_THRESHOLD_SECONDS: u64 = 60 * 60 * 12; // 12 hours +#[derive(Debug, Clone)] +pub struct QueueGroup(String); +impl QueueGroup { + pub fn new(group: &str) -> Self { + QueueGroup(group.to_string()) + } + pub fn to_string(&self) -> String { + self.0.clone() + } +} + /// Unconstructable type, used as a [TypedMessage::Response] to indicate that /// no response is allowed. #[derive(Serialize, Deserialize)] @@ -215,6 +226,11 @@ where None } + + pub async fn unsubscribe(&mut self) -> Result<()> { + self.subscription.unsubscribe().await?; + Ok(()) + } } impl Stream for TypedSubscription @@ -544,4 +560,20 @@ impl TypedNats { let subscription = self.nc.subscribe(subject.subject).await?; Ok(TypedSubscription::new(subscription, self.nc.clone())) } + + pub async fn queue_subscribe( + &self, + subject: SubscribeSubject, + queue_group: &QueueGroup, + ) -> Result> + where + T: TypedMessage, + { + tracing::info!(stream=%subject.subject, queue_group=%queue_group.to_string(), "Subscribing to NATS queue group."); + let subscription = self + .nc + .queue_subscribe(subject.subject, queue_group.to_string()) + .await?; + Ok(TypedSubscription::new(subscription, self.nc.clone())) + } }