Skip to content

Commit

Permalink
Use generics
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Nov 22, 2022
1 parent a8c4e21 commit 2eb4885
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 21 deletions.
80 changes: 61 additions & 19 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,14 @@ impl Stream {
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
/// stream.send_purge(async_nats::jetstream::stream::Purge::with_keep(100).filter("events")).await?;
/// stream.send_purge(async_nats::jetstream::stream::Purge::build().keep(100).filter("events")).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_purge(&self, purge: Purge) -> Result<PurgeResponse, Error> {
pub async fn send_purge<S: ToAssign, K: ToAssign>(
&self,
purge: Purge<S, K>,
) -> Result<PurgeResponse, Error> {
let request_subject = format!("STREAM.PURGE.{}", self.info.config.name);

let response: Response<PurgeResponse> =
Expand Down Expand Up @@ -1262,39 +1265,78 @@ pub struct External {
pub delivery_prefix: Option<String>,
}

use std::marker::PhantomData;

#[derive(Debug, Default)]
pub struct Yes;
#[derive(Debug, Default)]
pub struct No;

pub trait ToAssign: Debug {}

impl ToAssign for Yes {}
impl ToAssign for No {}

#[derive(Debug, Default)]
pub struct Purge {
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
}

impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
/// Adds subject filter to [Purge]
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}

impl Purge {
/// Creates new [Purge].
pub fn build() -> Self {
impl Purge<No, No> {
pub fn build() -> Purge<No, No> {
Default::default()
}
}

impl<KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
/// Creates a new [Purge].
/// `keep` and `sequence` are exclusive.
pub fn with_keep(keep: u64) -> Self {
Self {
/// `keep` and `sequence` are exclusive, enforced compile time by generics.
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
keep: Some(keep),
..Default::default()
..self.inner
},
}
}
}
impl<SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
/// Creates a new [Purge].
/// `keep` and `sequence` are exclusive.
pub fn with_sequence(sequence: u64) -> Self {
Self {
/// `keep` and `sequence` are exclusive, enforces compile time by generics.
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
sequence: Some(sequence),
..Default::default()
..self.inner
},
}
}
/// Adds subject filter to [Purge]
pub fn filter<T: Into<String>>(mut self, filter: T) -> Self {
self.inner.filter = Some(filter.into());
self
}
}
7 changes: 5 additions & 2 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,12 @@ mod jetstream {
}
let mut stream = context.get_stream("events").await.unwrap();

stream.send_purge(Purge::with_sequence(90)).await.unwrap();
stream
.send_purge(Purge::build().sequence(90))
.await
.unwrap();
assert_eq!(stream.info().await.unwrap().state.messages, 11);
stream.send_purge(Purge::with_keep(5)).await.unwrap();
stream.send_purge(Purge::build().keep(5)).await.unwrap();
assert_eq!(stream.info().await.unwrap().state.messages, 5);
}

Expand Down

0 comments on commit 2eb4885

Please sign in to comment.