Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Nov 23, 2022
1 parent 006d40e commit a1f302f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
71 changes: 35 additions & 36 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ impl Stream {
/// # Ok(())
/// # }
/// ```
pub fn purge(&self) -> PurgeFuture<No, No> {
PurgeFuture::build(self.clone())
pub fn purge(&self) -> Purge<No, No> {
Purge::build(self.clone())
}

/// Purge `Stream` messages for a matching subject.
Expand Down Expand Up @@ -1246,32 +1246,32 @@ impl ToAssign for Yes {}
impl ToAssign for No {}

#[derive(Debug)]
pub struct PurgeFuture<SEQUENCE, KEEP>
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
stream: crate::jetstream::stream::Stream,
stream: Stream,
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
}

impl<SEQUENCE, KEEP> PurgeFuture<SEQUENCE, KEEP>
impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
/// Adds subject filter to [PurgeRequest]
pub fn filter<T: Into<String>>(mut self, filter: T) -> PurgeFuture<SEQUENCE, KEEP> {
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}

impl PurgeFuture<No, No> {
pub(crate) fn build(stream: Stream) -> PurgeFuture<No, No> {
PurgeFuture {
impl Purge<No, No> {
pub(crate) fn build(stream: Stream) -> Purge<No, No> {
Purge {
stream,
inner: Default::default(),
sequence_set: PhantomData {},
Expand All @@ -1280,14 +1280,14 @@ impl PurgeFuture<No, No> {
}
}

impl<KEEP> PurgeFuture<No, KEEP>
impl<KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforced compile time by generics.
pub fn keep(self, keep: u64) -> PurgeFuture<No, Yes> {
PurgeFuture {
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
Expand All @@ -1298,14 +1298,14 @@ where
}
}
}
impl<SEQUENCE> PurgeFuture<SEQUENCE, No>
impl<SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforces compile time by generics.
pub fn sequence(self, sequence: u64) -> PurgeFuture<Yes, No> {
PurgeFuture {
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
Expand All @@ -1317,7 +1317,7 @@ where
}
}

impl<S, K> IntoFuture for PurgeFuture<S, K>
impl<S, K> IntoFuture for Purge<S, K>
where
S: ToAssign + std::marker::Send,
K: ToAssign + std::marker::Send,
Expand All @@ -1327,25 +1327,24 @@ where
type IntoFuture = Pin<Box<dyn Future<Output = Result<PurgeResponse, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(send_purge_request(
self.stream,
self.inner,
)))
}
}
async fn send_purge_request(stream: Stream, request: PurgeRequest) -> Result<PurgeResponse, Error> {
let request_subject = format!("STREAM.PURGE.{}", stream.info.config.name);

let response: Response<PurgeResponse> =
stream.context.request(request_subject, &request).await?;
match response {
Response::Err { error } => Err(Box::new(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
Box::pin(std::future::IntoFuture::into_future(async move {
let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);

let response: Response<PurgeResponse> = self
.stream
.context
.request(request_subject, &self.inner)
.await?;
match response {
Response::Err { error } => Err(Box::from(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
}
}))
}
}
2 changes: 1 addition & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod jetstream {
assert_eq!(stream.info().await.unwrap().state.messages, 3);
}
#[tokio::test]
async fn send_purge() {
async fn purge() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);
Expand Down

0 comments on commit a1f302f

Please sign in to comment.