Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update consumer last_seen on status messages and requests #856

Merged
merged 2 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 30 additions & 3 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,6 @@ impl futures::Stream for Stream {
match self.heartbeats_missing.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(()) => {
self.terminated = true;
trace!("received missing heartbeats notification");
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
Expand All @@ -602,6 +601,12 @@ impl futures::Stream for Stream {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
// Got a response, meaning consumer is alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}
debug!("request successful, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
Expand All @@ -626,12 +631,32 @@ impl futures::Stream for Stream {
Poll::Ready(maybe_message) => match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
debug!("received status message: {:?}", message);
// If consumer has been deleted, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer Deleted") {
self.terminated = true;
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("{:?}: {:?}", message.status, message.description),
)))));
}
// If consumer is not pull based, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer is push based") {
self.terminated = true;
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("{:?}: {:?}", message.status, message.description),
)))));
}
// All other cases can be handled.

// Got a status message from a consumer, meaning it's alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}

// Do accounting for messages left after terminated/completed pull request.
let pending_messages = message
.headers
.as_ref()
Expand All @@ -658,14 +683,15 @@ impl futures::Stream for Stream {
self.pending_bytes = self.pending_bytes.saturating_sub(pending_bytes);
continue;
}

// Idle Hearbeat means we have no messages, but consumer is fine.
StatusCode::IDLE_HEARTBEAT => {
debug!("received idle heartbeat");
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}
continue;
}
// We got an message from a stream.
StatusCode::OK => {
trace!("message received");
if !self.batch_config.idle_heartbeat.is_zero() {
Expand All @@ -680,13 +706,14 @@ impl futures::Stream for Stream {
})));
}
status => {
debug!("received unknown message: {:?}", message);
return Poll::Ready(Some(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
),
)))))
)))));
}
},
None => return Poll::Ready(None),
Expand Down
52 changes: 49 additions & 3 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,54 @@ mod jetstream {
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

let name = &consumer.cached_info().name;
stream.delete_consumer(name).await.unwrap();
let mut messages = consumer.messages().await.unwrap();

assert_eq!(
messages
.next()
.await
.unwrap()
.unwrap_err()
.downcast::<std::io::Error>()
.unwrap()
.kind(),
std::io::ErrorKind::TimedOut
);
}

#[tokio::test]
async fn pull_consumer_stream_deleted() {
tracing_subscriber::fmt::init();
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

let stream = context.get_stream("events").await.unwrap();
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

context
.publish("events".to_string(), "dat".into())
.await
Expand All @@ -1936,7 +1984,6 @@ mod jetstream {
messages.next().await.unwrap().unwrap().ack().await.unwrap();
let name = &consumer.cached_info().name;
stream.delete_consumer(name).await.unwrap();
let now = Instant::now();
assert_eq!(
messages
.next()
Expand All @@ -1946,11 +1993,10 @@ mod jetstream {
.downcast::<std::io::Error>()
.unwrap()
.kind(),
std::io::ErrorKind::TimedOut
std::io::ErrorKind::NotFound
);
// after terminal error, consumer should always return none.
assert!(messages.next().await.is_none());
assert!(now.elapsed().le(&Duration::from_secs(50)));
}

#[tokio::test]
Expand Down