From 00b495c31d8f2428842ad17f3ca4205f0f65b1dd Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 22 Nov 2023 11:36:41 -0600 Subject: [PATCH 1/2] [FIX] added a status notification if the stream is deleted while a consumer in a disconnect cycle --- jetstream/consumer.ts | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index acf2ebed..aa9ed116 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -161,8 +161,8 @@ export enum ConsumerEvents { */ HeartbeatsMissed = "heartbeats_missed", /** - * Notification that the consumer was not found. Consumers that yielded at least - * one message will be retried for more messages regardless of the not being found + * Notification that the consumer was not found. Consumers that were accessible at + * least once, will be retried for more messages regardless of the not being found * or timeouts etc. This notification includes a count of consecutive attempts to * find the consumer. Note that if you get this notification possibly your code should * attempt to recreate the consumer. Note that this notification is only informational @@ -170,6 +170,16 @@ export enum ConsumerEvents { */ ConsumerNotFound = "consumer_not_found", + /** + * Notification that the stream was not found. Consumers were accessible at least once, + * will be retried for more messages regardless of the not being found + * or timeouts etc. This notification includes a count of consecutive attempts to + * find the consumer. Note that if you get this notification possibly your code should + * attempt to recreate the consumer. Note that this notification is only informational + * for ordered consumers, as the consumer will be created in those cases automatically. + */ + StreamNotFound = "stream_not_found", + /** * This notification is specific of ordered consumers and will be notified whenever * the consumer is recreated. The argument is the name of the newly created consumer. @@ -499,6 +509,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl async resetPending(): Promise { let notFound = 0; + let streamNotFound = 0; const bo = backoff(); let attempt = 0; while (true) { @@ -519,7 +530,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl return true; } catch (err) { // game over - if (err.message === "consumer not found") { + if (err.message === "stream not found") { + streamNotFound++; + this.notify(ConsumerEvents.StreamNotFound, streamNotFound); + } else if (err.message === "consumer not found") { notFound++; this.notify(ConsumerEvents.ConsumerNotFound, notFound); if (this.resetHandler) { @@ -534,6 +548,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } else { notFound = 0; + streamNotFound = 0; } const to = bo.backoff(attempt); // wait for delay or till the client closes From 907b979bb21557eaf7335ee0aeeafd6ef8aa3ab0 Mon Sep 17 00:00:00 2001 From: aricart Date: Sun, 3 Dec 2023 15:02:35 -0400 Subject: [PATCH 2/2] fmt --- jetstream/consumer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index c4472b48..842b6d0b 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -179,7 +179,7 @@ export enum ConsumerEvents { * for ordered consumers, as the consumer will be created in those cases automatically. */ StreamNotFound = "stream_not_found", - + /* * Notification that the consumer was deleted. This notification * means the consumer will not get messages unless it is recreated. The client