diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 6846fca17..67ae9e0e6 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -578,7 +578,6 @@ impl futures::Stream for Stream { match self.heartbeats_missing.poll_recv(cx) { Poll::Ready(resp) => match resp { Some(()) => { - self.terminated = true; trace!("received missing heartbeats notification"); return Poll::Ready(Some(Err(Box::new(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -602,6 +601,12 @@ impl futures::Stream for Stream { Poll::Ready(resp) => match resp { Some(resp) => match resp { Ok(reset) => { + trace!("request response: {:?}", reset); + // Got a response, meaning consumer is alive. + // Update last seen. + if !self.batch_config.idle_heartbeat.is_zero() { + *self.last_seen.lock().unwrap() = Instant::now(); + } debug!("request successful, setting pending messages"); if reset { self.pending_messages = self.batch_config.batch; @@ -626,12 +631,32 @@ impl futures::Stream for Stream { Poll::Ready(maybe_message) => match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => { + debug!("received status message: {:?}", message); + // If consumer has been deleted, error and shutdown the iterator. + if message.description.as_deref() == Some("Consumer Deleted") { + self.terminated = true; + return Poll::Ready(Some(Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("{:?}: {:?}", message.status, message.description), + ))))); + } + // If consumer is not pull based, error and shutdown the iterator. if message.description.as_deref() == Some("Consumer is push based") { + self.terminated = true; return Poll::Ready(Some(Err(Box::new(std::io::Error::new( std::io::ErrorKind::Other, format!("{:?}: {:?}", message.status, message.description), ))))); } + // All other cases can be handled. + + // Got a status message from a consumer, meaning it's alive. + // Update last seen. + if !self.batch_config.idle_heartbeat.is_zero() { + *self.last_seen.lock().unwrap() = Instant::now(); + } + + // Do accounting for messages left after terminated/completed pull request. let pending_messages = message .headers .as_ref() @@ -658,7 +683,7 @@ impl futures::Stream for Stream { self.pending_bytes = self.pending_bytes.saturating_sub(pending_bytes); continue; } - + // Idle Hearbeat means we have no messages, but consumer is fine. StatusCode::IDLE_HEARTBEAT => { debug!("received idle heartbeat"); if !self.batch_config.idle_heartbeat.is_zero() { @@ -666,6 +691,7 @@ impl futures::Stream for Stream { } continue; } + // We got an message from a stream. StatusCode::OK => { trace!("message received"); if !self.batch_config.idle_heartbeat.is_zero() { @@ -680,13 +706,14 @@ impl futures::Stream for Stream { }))); } status => { + debug!("received unknown message: {:?}", message); return Poll::Ready(Some(Err(Box::new(std::io::Error::new( std::io::ErrorKind::Other, format!( "error while processing messages from the stream: {}, {:?}", status, message.description ), - ))))) + ))))); } }, None => return Poll::Ready(None), diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index c240be643..afe8ca608 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -1926,6 +1926,54 @@ mod jetstream { .unwrap(); let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap(); + let name = &consumer.cached_info().name; + stream.delete_consumer(name).await.unwrap(); + let mut messages = consumer.messages().await.unwrap(); + + assert_eq!( + messages + .next() + .await + .unwrap() + .unwrap_err() + .downcast::() + .unwrap() + .kind(), + std::io::ErrorKind::TimedOut + ); + } + + #[tokio::test] + async fn pull_consumer_stream_deleted() { + tracing_subscriber::fmt::init(); + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + context + .create_stream(stream::Config { + name: "events".to_string(), + subjects: vec!["events".to_string()], + ..Default::default() + }) + .await + .unwrap(); + + let stream = context.get_stream("events").await.unwrap(); + stream + .create_consumer(consumer::pull::Config { + durable_name: Some("pull".to_string()), + ..Default::default() + }) + .await + .unwrap(); + let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap(); + context .publish("events".to_string(), "dat".into()) .await @@ -1936,7 +1984,6 @@ mod jetstream { messages.next().await.unwrap().unwrap().ack().await.unwrap(); let name = &consumer.cached_info().name; stream.delete_consumer(name).await.unwrap(); - let now = Instant::now(); assert_eq!( messages .next() @@ -1946,11 +1993,10 @@ mod jetstream { .downcast::() .unwrap() .kind(), - std::io::ErrorKind::TimedOut + std::io::ErrorKind::NotFound ); // after terminal error, consumer should always return none. assert!(messages.next().await.is_none()); - assert!(now.elapsed().le(&Duration::from_secs(50))); } #[tokio::test]