Skip to content

Commit

Permalink
Merge pull request #615 from nats-io/consumer_changes
Browse files Browse the repository at this point in the history
Consumer changes
  • Loading branch information
aricart committed Oct 21, 2023
2 parents bb1bd03 + db9c7c8 commit 3e87f30
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 72 deletions.
6 changes: 3 additions & 3 deletions bin/dependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {

// resolve the specified directories to fq
// let dirs = ["src", "nats-base-client", "jetstream", "bin"].map((n) => {
let dirs = ["."].map((n) => {
const dirs = ["."].map((n) => {
return resolve(n);
});

Expand All @@ -47,9 +47,9 @@ const m = new Map<string, string[]>();
// process each file - remove extensions from requires/import
for (const fn of files) {
const data = await Deno.readFile(fn);
let txt = new TextDecoder().decode(data);
const txt = new TextDecoder().decode(data);
const iter = txt.matchAll(/from\s+"(\S+.[t|j]s)"/gim);
for (let s of iter) {
for (const s of iter) {
let dep = s[1];
if (dep.startsWith(`./`) || dep.startsWith(`../`)) {
// this is local code
Expand Down
181 changes: 130 additions & 51 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { deferred, Timeout, timeout } from "../nats-base-client/util.ts";
import {
backoff,
deferred,
delay,
Timeout,
timeout,
} from "../nats-base-client/util.ts";
import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { nuid } from "../nats-base-client/nuid.ts";
import { isHeartbeatMsg, nanos } from "./jsutil.ts";
Expand All @@ -26,7 +32,7 @@ import {
Status,
Subscription,
} from "../nats-base-client/core.ts";
import * as core from "../nats-base-client/idleheartbeat.ts";
import { IdleHeartbeatMonitor } from "../nats-base-client/idleheartbeat_monitor.ts";
import { JsMsg, toJsMsg } from "./jsmsg.ts";
import { MsgImpl } from "../nats-base-client/msg.ts";
import {
Expand All @@ -53,13 +59,13 @@ export type NextOptions = Expires;
export type ConsumeBytes =
& MaxBytes
& Partial<MaxMessages>
& ThresholdBytes
& Partial<ThresholdBytes>
& Expires
& IdleHeartbeat
& ConsumeCallback;
export type ConsumeMessages =
& Partial<MaxMessages>
& ThresholdMessages
& Partial<ThresholdMessages>
& Expires
& IdleHeartbeat
& ConsumeCallback;
Expand Down Expand Up @@ -102,15 +108,15 @@ export type ThresholdMessages = {
* from the server. This is only applicable to `consume`.
* @default 75% of {@link MaxMessages}.
*/
threshold_messages?: number;
threshold_messages: number;
};
export type ThresholdBytes = {
/**
* Threshold bytes on which the client wil auto-trigger additional message requests
* from the server. This is only applicable to `consume`.
* @default 75% of {@link MaxBytes}.
*/
threshold_bytes?: number;
threshold_bytes: number;
};
export type Expires = {
/**
Expand Down Expand Up @@ -154,6 +160,21 @@ export enum ConsumerEvents {
* the client is disconnected.
*/
HeartbeatsMissed = "heartbeats_missed",
/**
* Notification that the consumer was not found. Consumers that yielded at least
* one message will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
ConsumerNotFound = "consumer_not_found",

/**
* This notification is specific of ordered consumers and will be notified whenever
* the consumer is recreated. The argument is the name of the newly created consumer.
*/
OrderedConsumerRecreated = "ordered_consumer_recreated",
}

/**
Expand Down Expand Up @@ -219,8 +240,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
sub: Subscription;
monitor: core.IdleHeartbeat | null;
sub!: Subscription;
monitor: IdleHeartbeatMonitor | null;
pending: { msgs: number; bytes: number; requests: number };
inbox: string;
refilling: boolean;
Expand All @@ -231,6 +252,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
cleanupHandler?: () => void;
listeners: QueuedIterator<ConsumerStatus>[];
statusIterator?: QueuedIteratorImpl<Status>;
forOrderedConsumer: boolean;
resetHandler?: () => void;

// callback: ConsumerCallbackFn;
constructor(
Expand All @@ -252,7 +275,12 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.timeout = null;
this.inbox = createInbox(c.api.nc.options.inboxPrefix);
this.listeners = [];
this.forOrderedConsumer = false;

this.start();
}

start(): void {
const {
max_messages,
max_bytes,
Expand Down Expand Up @@ -281,7 +309,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
sub.unsubscribe();
}

this.sub = c.api.nc.subscribe(this.inbox, {
this.sub = this.consumer.api.nc.subscribe(this.inbox, {
callback: (err, msg) => {
if (err) {
// this is possibly only a permissions error which means
Expand Down Expand Up @@ -386,24 +414,28 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
});

if (idle_heartbeat) {
this.monitor = new core.IdleHeartbeat(idle_heartbeat, (data): boolean => {
// for the pull consumer - missing heartbeats may be corrected
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
this.resetPending()
.then(() => {
})
.catch(() => {
});
return false;
}, { maxOut: 2 });
this.monitor = new IdleHeartbeatMonitor(
idle_heartbeat,
(data): boolean => {
// for the pull consumer - missing heartbeats may be corrected
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
this.resetPending()
.then(() => {
})
.catch(() => {
});
return false;
},
{ maxOut: 2 },
);
}

// now if we disconnect, the consumer could be gone
// or we were slow consumer'ed by the server
(async () => {
const status = c.api.nc.status();
const status = this.consumer.api.nc.status();
this.statusIterator = status as QueuedIteratorImpl<Status>;
for await (const s of status) {
switch (s.type) {
Expand Down Expand Up @@ -465,25 +497,50 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}

resetPending(): Promise<boolean> {
// check we exist
return this.consumer.info()
.then((_ci) => {
async resetPending(): Promise<boolean> {
let notFound = 0;
const bo = backoff();
let attempt = 0;
while (true) {
if (this.consumer.api.nc.isClosed()) {
console.error("aborting resetPending - connection is closed");
return false;
}
try {
// check we exist
await this.consumer.info();
notFound = 0;
// we exist, so effectively any pending state is gone
// so reset and re-pull
this.pending.msgs = 0;
this.pending.bytes = 0;
this.pending.requests = 0;
this.pull(this.pullOptions());
return true;
})
.catch((err) => {
} catch (err) {
// game over
if (err.message === "consumer not found") {
this.stop(err);
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
if (this.resetHandler) {
try {
this.resetHandler();
} catch (_) {
// ignored
}
}
if (this.forOrderedConsumer) {
return false;
}
} else {
notFound = 0;
}
return false;
});
const to = bo.backoff(attempt);
// wait for delay or till the client closes
await Promise.race([delay(to), this.consumer.api.nc.closed()]);
attempt++;
}
}
}

pull(opts: Partial<PullOptions>) {
Expand Down Expand Up @@ -626,25 +683,49 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
src!: PullConsumerMessagesImpl;
listeners: QueuedIterator<ConsumerStatus>[];

constructor() {
super();
this.listeners = [];
}

setSource(src: PullConsumerMessagesImpl) {
if (this.src) {
this.src.resetHandler = undefined;
this.src.setCleanupHandler();
this.src.stop();
}
this.src = src;
this.src.setCleanupHandler(() => {
this.close().catch();
});
(async () => {
const status = await this.src.status();
for await (const s of status) {
this.notify(s.type, s.data);
}
})().catch(() => {});
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
if (this.listeners.length > 0) {
(() => {
this.listeners.forEach((l) => {
if (!(l as QueuedIteratorImpl<ConsumerStatus>).done) {
l.push({ type, data });
}
});
})();
}
}

stop(err?: Error): void {
this.src?.stop(err);
super.stop(err);
this.listeners.forEach((n) => {
n.stop();
});
}

close(): Promise<void> {
Expand All @@ -653,9 +734,9 @@ export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
}

status(): Promise<AsyncIterable<ConsumerStatus>> {
return Promise.reject(
new Error("ordered consumers don't report consumer status"),
);
const iter = new QueuedIteratorImpl<ConsumerStatus>();
this.listeners.push(iter);
return Promise.resolve(iter);
}
}

Expand Down Expand Up @@ -880,34 +961,26 @@ export class OrderedPullConsumerImpl implements Consumer {

async resetConsumer(seq = 0): Promise<ConsumerInfo> {
// try to delete the consumer
if (this.consumer) {
// FIXME: this needs to be a standard request option to retry
while (true) {
try {
await this.delete();
break;
} catch (err) {
if (err.message !== "TIMEOUT") {
throw err;
}
}
}
}
this.consumer?.delete().catch(() => {});
seq = seq === 0 ? 1 : seq;
// reset the consumer sequence as JetStream will renumber from 1
this.cursor.deliver_seq = 0;
const config = this.getConsumerOpts(seq);
config.max_deliver = 1;
config.mem_storage = true;
const bo = backoff();
let ci;
// FIXME: replace with general jetstream retry logic
while (true) {
for (let i = 0;; i++) {
try {
ci = await this.api.add(this.stream, config);
this.iter?.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name);
break;
} catch (err) {
if (err.message !== "TIMEOUT") {
if (seq === 0 && i >= 30) {
// consumer was never created, so we can fail this
throw err;
} else {
await delay(bo.backoff(i + 1));
}
}
}
Expand Down Expand Up @@ -947,6 +1020,7 @@ export class OrderedPullConsumerImpl implements Consumer {
this.iter = new OrderedConsumerMessages();
}
this.consumer = new PullConsumerImpl(this.api, this.currentConsumer);

const copts = opts as ConsumeOptions;
copts.callback = this.internalHandler(this.serial);
let msgs: ConsumerMessages | null = null;
Expand All @@ -958,7 +1032,12 @@ export class OrderedPullConsumerImpl implements Consumer {
} else {
return Promise.reject("reset called with unset consumer type");
}
this.iter.setSource(msgs as PullConsumerMessagesImpl);
const msgsImpl = msgs as PullConsumerMessagesImpl;
msgsImpl.forOrderedConsumer = true;
msgsImpl.resetHandler = () => {
this.reset(this.opts);
};
this.iter.setSource(msgsImpl);
return this.iter;
}

Expand Down
Loading

0 comments on commit 3e87f30

Please sign in to comment.