diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index acf2ebed..aa9ed116 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -161,8 +161,8 @@ export enum ConsumerEvents { */ HeartbeatsMissed = "heartbeats_missed", /** - * Notification that the consumer was not found. Consumers that yielded at least - * one message will be retried for more messages regardless of the not being found + * Notification that the consumer was not found. Consumers that were accessible at + * least once, will be retried for more messages regardless of the not being found * or timeouts etc. This notification includes a count of consecutive attempts to * find the consumer. Note that if you get this notification possibly your code should * attempt to recreate the consumer. Note that this notification is only informational @@ -170,6 +170,16 @@ export enum ConsumerEvents { */ ConsumerNotFound = "consumer_not_found", + /** + * Notification that the stream was not found. Consumers were accessible at least once, + * will be retried for more messages regardless of the not being found + * or timeouts etc. This notification includes a count of consecutive attempts to + * find the consumer. Note that if you get this notification possibly your code should + * attempt to recreate the consumer. Note that this notification is only informational + * for ordered consumers, as the consumer will be created in those cases automatically. + */ + StreamNotFound = "stream_not_found", + /** * This notification is specific of ordered consumers and will be notified whenever * the consumer is recreated. The argument is the name of the newly created consumer. @@ -499,6 +509,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl async resetPending(): Promise { let notFound = 0; + let streamNotFound = 0; const bo = backoff(); let attempt = 0; while (true) { @@ -519,7 +530,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl return true; } catch (err) { // game over - if (err.message === "consumer not found") { + if (err.message === "stream not found") { + streamNotFound++; + this.notify(ConsumerEvents.StreamNotFound, streamNotFound); + } else if (err.message === "consumer not found") { notFound++; this.notify(ConsumerEvents.ConsumerNotFound, notFound); if (this.resetHandler) { @@ -534,6 +548,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } else { notFound = 0; + streamNotFound = 0; } const to = bo.backoff(attempt); // wait for delay or till the client closes