Skip to content

Commit

Permalink
Adapt pull consumer termination and error conditions
Browse files Browse the repository at this point in the history
Idle heartbeat should not terminate iterator, while
status of consumer deleted should.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Mar 1, 2023
1 parent 67afcbb commit 68f4b7c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 10 deletions.
31 changes: 24 additions & 7 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,6 @@ impl futures::Stream for Stream {
match self.heartbeats_missing.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(()) => {
self.terminated = true;
trace!("received missing heartbeats notification");
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
Expand All @@ -602,6 +601,7 @@ impl futures::Stream for Stream {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
// Got a response, meaning consumer is alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
Expand Down Expand Up @@ -631,17 +631,32 @@ impl futures::Stream for Stream {
Poll::Ready(maybe_message) => match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
// Got a status message from a consumer, meaning it's alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
debug!("received status message: {:?}", message);
// If consumer has been deleted, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer Deleted") {
self.terminated = true;
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("{:?}: {:?}", message.status, message.description),
)))));
}
// If consumer is not pull based, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer is push based") {
self.terminated = true;
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("{:?}: {:?}", message.status, message.description),
)))));
}
// All other cases can be handled.

// Got a status message from a consumer, meaning it's alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}

// Do accounting for messages left after terminated/completed pull request.
let pending_messages = message
.headers
.as_ref()
Expand All @@ -668,14 +683,15 @@ impl futures::Stream for Stream {
self.pending_bytes = self.pending_bytes.saturating_sub(pending_bytes);
continue;
}

// Idle Hearbeat means we have no messages, but consumer is fine.
StatusCode::IDLE_HEARTBEAT => {
debug!("received idle heartbeat");
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}
continue;
}
// We got an message from a stream.
StatusCode::OK => {
trace!("message received");
if !self.batch_config.idle_heartbeat.is_zero() {
Expand All @@ -690,13 +706,14 @@ impl futures::Stream for Stream {
})));
}
status => {
debug!("received unknown message: {:?}", message);
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
),
)))))
)))));
}
},
None => return Poll::Ready(None),
Expand Down
52 changes: 49 additions & 3 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,54 @@ mod jetstream {
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

let name = &consumer.cached_info().name;
stream.delete_consumer(name).await.unwrap();
let mut messages = consumer.messages().await.unwrap();

assert_eq!(
messages
.next()
.await
.unwrap()
.unwrap_err()
.downcast::<std::io::Error>()
.unwrap()
.kind(),
std::io::ErrorKind::TimedOut
);
}

#[tokio::test]
async fn pull_consumer_stream_deleted() {
tracing_subscriber::fmt::init();
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

let stream = context.get_stream("events").await.unwrap();
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

context
.publish("events".to_string(), "dat".into())
.await
Expand All @@ -1936,7 +1984,6 @@ mod jetstream {
messages.next().await.unwrap().unwrap().ack().await.unwrap();
let name = &consumer.cached_info().name;
stream.delete_consumer(name).await.unwrap();
let now = Instant::now();
assert_eq!(
messages
.next()
Expand All @@ -1946,11 +1993,10 @@ mod jetstream {
.downcast::<std::io::Error>()
.unwrap()
.kind(),
std::io::ErrorKind::TimedOut
std::io::ErrorKind::NotFound
);
// after terminal error, consumer should always return none.
assert!(messages.next().await.is_none());
assert!(now.elapsed().le(&Duration::from_secs(50)));
}

#[tokio::test]
Expand Down

0 comments on commit 68f4b7c

Please sign in to comment.