Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] added a status notification if the stream is deleted while a consumer in a disconnect cycle #638

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 18 additions & 3 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,25 @@ export enum ConsumerEvents {
*/
HeartbeatsMissed = "heartbeats_missed",
/**
* Notification that the consumer was not found. Consumers that yielded at least
* one message will be retried for more messages regardless of the not being found
* Notification that the consumer was not found. Consumers that were accessible at
* least once, will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
ConsumerNotFound = "consumer_not_found",

/**
* Notification that the stream was not found. Consumers were accessible at least once,
* will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
StreamNotFound = "stream_not_found",

/**
* This notification is specific of ordered consumers and will be notified whenever
* the consumer is recreated. The argument is the name of the newly created consumer.
Expand Down Expand Up @@ -499,6 +509,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

async resetPending(): Promise<boolean> {
let notFound = 0;
let streamNotFound = 0;
const bo = backoff();
let attempt = 0;
while (true) {
Expand All @@ -519,7 +530,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return true;
} catch (err) {
// game over
if (err.message === "consumer not found") {
if (err.message === "stream not found") {
streamNotFound++;
this.notify(ConsumerEvents.StreamNotFound, streamNotFound);
} else if (err.message === "consumer not found") {
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
if (this.resetHandler) {
Expand All @@ -534,6 +548,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
notFound = 0;
streamNotFound = 0;
}
const to = bo.backoff(attempt);
// wait for delay or till the client closes
Expand Down