Skip to content

Commit

Permalink
[FIX] added a status notification if the stream is deleted while a co…
Browse files Browse the repository at this point in the history
…nsumer in a disconnect cycle
  • Loading branch information
aricart committed Nov 22, 2023
1 parent 2082cc6 commit 00b495c
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,25 @@ export enum ConsumerEvents {
*/
HeartbeatsMissed = "heartbeats_missed",
/**
* Notification that the consumer was not found. Consumers that yielded at least
* one message will be retried for more messages regardless of the not being found
* Notification that the consumer was not found. Consumers that were accessible at
* least once, will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
ConsumerNotFound = "consumer_not_found",

/**
* Notification that the stream was not found. Consumers were accessible at least once,
* will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
StreamNotFound = "stream_not_found",

/**
* This notification is specific of ordered consumers and will be notified whenever
* the consumer is recreated. The argument is the name of the newly created consumer.
Expand Down Expand Up @@ -499,6 +509,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

async resetPending(): Promise<boolean> {
let notFound = 0;
let streamNotFound = 0;
const bo = backoff();
let attempt = 0;
while (true) {
Expand All @@ -519,7 +530,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return true;
} catch (err) {
// game over
if (err.message === "consumer not found") {
if (err.message === "stream not found") {
streamNotFound++;
this.notify(ConsumerEvents.StreamNotFound, streamNotFound);
} else if (err.message === "consumer not found") {
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
if (this.resetHandler) {
Expand All @@ -534,6 +548,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
notFound = 0;
streamNotFound = 0;
}
const to = bo.backoff(attempt);
// wait for delay or till the client closes
Expand Down

0 comments on commit 00b495c

Please sign in to comment.