Skip to content

Commit

Permalink
[FEAT] heartbeat for fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 29, 2022
1 parent dbbd8d5 commit e8600fa
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 2 deletions.
1 change: 1 addition & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum ErrorCode {
JetStream409MaxAckPendingExceeded = "409",
JetStream409 = "409",
JetStreamNotEnabled = "503",
JetStreamIdleHeartBeat = "IDLE HEARTBEAT",

// emitted by the server
AuthorizationViolation = "AUTHORIZATION_VIOLATION",
Expand Down
48 changes: 48 additions & 0 deletions nats-base-client/idleheartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Deferred, deferred } from "./util.ts";

export class IdleHeartbeat {
interval: number;
maxOut: number;
timer?: number;
last!: number;
missed: number;
done: Deferred<number>;
count: number;

constructor(interval: number, maxOut: number) {
this.interval = interval;
this.maxOut = maxOut;
this.done = deferred();
this.last = Date.now();
this.missed = 0;
this.count = 0;

this._schedule();
}

cancel() {
if (this.timer) {
clearInterval(this.timer);
}
this.done.resolve(0);
}

ok() {
this.last = Date.now();
this.missed = 0;
}

_schedule() {
// @ts-ignore: node is not a number - we treat this opaquely
this.timer = setInterval(() => {
this.count++;
if (Date.now() - this.interval > this.last) {
this.missed++;
}
if (this.missed >= this.maxOut) {
clearInterval(this.timer);
this.done.resolve(this.missed);
}
}, this.interval);
}
}
53 changes: 53 additions & 0 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { Feature } from "./semver.ts";
import { IdleHeartbeat } from "./idleheartbeat.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand Down Expand Up @@ -208,6 +209,7 @@ export class JetStreamClientImpl extends BaseApiClient
const trackBytes = (opts.max_bytes ?? 0) > 0;
let receivedBytes = 0;
const max_bytes = trackBytes ? opts.max_bytes! : 0;
let monitor: IdleHeartbeat | null = null;

const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
Expand All @@ -231,14 +233,39 @@ export class JetStreamClientImpl extends BaseApiClient
if (expires === 0 && args.no_wait === false) {
throw new Error("expires or no_wait is required");
}
const hb = opts.idle_heartbeat || 0;
if (hb) {
args.idle_heartbeat = nanos(hb);
//@ts-ignore: for testing
if (opts.delay_heartbeat === true) {
//@ts-ignore
args.idle_heartbeat = nanos(hb * 4);
}
}

const qi = new QueuedIteratorImpl<JsMsg>();
const wants = args.batch;
let received = 0;
qi.protocolFilterFn = (jm, ingest = false): boolean => {
const jsmi = jm as JsMsgImpl;
if (isHeartbeatMsg(jsmi.msg)) {
const keys = jsmi.msg.headers?.keys()!;
keys.forEach((k) => {
console.log(k, jsmi.msg.headers?.get(k));
});

monitor?.ok();
return false;
}
return true;
};
// FIXME: this looks weird, we want to stop the iterator
// but doing it from a dispatchedFn...
qi.dispatchedFn = (m: JsMsg | null) => {
if (m) {
// if we are doing heartbeats, message resets
monitor?.ok();

if (trackBytes) {
receivedBytes += m.data.length;
}
Expand Down Expand Up @@ -292,16 +319,42 @@ export class JetStreamClientImpl extends BaseApiClient
sub.drain();
timer = null;
}
if (monitor) {
monitor.cancel();
}
});
}

(async () => {
try {
if (hb) {
monitor = new IdleHeartbeat(hb, 3);
monitor.done.then((v) => {
if (v !== 0) {
//@ts-ignore: pushing a fn
qi.push(() => {
// this will throw
qi.err = new NatsError(
`idle heartbeats missed: ${v}`,
ErrorCode.JetStreamIdleHeartBeat,
);
});
}
});
}
} catch (_err) {
// ignore it
}

// close the iterator if the connection or subscription closes unexpectedly
await (sub as SubscriptionImpl).closed;
if (timer !== null) {
timer.cancel();
timer = null;
}
if (monitor) {
monitor.cancel();
}
qi.stop();
})().catch();

Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
): Promise<StoredMsg> {
validateStreamName(stream);
const r = await this.nc.request(
`$JS.DS.GET.${stream}`,
`$JS.API.DIRECT.GET.${stream}`,
this.jc.encode(query),
);

Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
dispatchedFn?: DispatchedFn<T>;
ctx?: unknown;
_data?: unknown; //data is for use by extenders in any way they like
private err?: Error;
err?: Error;

constructor() {
this.inflight = 0;
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ export interface PullOptions {
"no_wait": boolean;
expires: number;
"max_bytes": number;
"idle_heartbeat": number;
}

export interface PubAck {
Expand Down
62 changes: 62 additions & 0 deletions tests/idleheartbeats_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { IdleHeartbeat } from "../nats-base-client/idleheartbeat.ts";
import {
assert,
assertEquals,
fail,
} from "https://deno.land/std@0.138.0/testing/asserts.ts";

Deno.test("idleheartbeat - basic", async () => {
const h = new IdleHeartbeat(250, 1);
let count = 0;
const timer = setInterval(() => {
count++;
h.ok();
if (count === 8) {
clearInterval(timer);
h.cancel();
}
}, 100);
const v = await h.done;
assertEquals(v, 0);
assert(h.count > 0);
});

Deno.test("idleheartbeat - timeout", async () => {
const h = new IdleHeartbeat(250, 1);
h.done.catch((err) => {
fail(`promise failed ${err.message}`);
});

const v = await h.done;
assertEquals(v, 1);
});

Deno.test("idleheartbeat - timeout maxOut", async () => {
const h = new IdleHeartbeat(250, 5);
h.done.catch((err) => {
fail(`promise failed ${err.message}`);
});

const v = await h.done;
assertEquals(v, 5);
});

Deno.test("idleheartbeat - timeout recover", async () => {
const h = new IdleHeartbeat(250, 5);
h.done.catch((err) => {
fail(`promise failed ${err.message}`);
});

const interval = setInterval(() => {
h.ok();
}, 1000);

setTimeout(() => {
h.cancel();
clearInterval(interval);
}, 1650);

const v = await h.done;
assertEquals(v, 0);
assertEquals(h.missed, 2);
});
56 changes: 56 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3345,3 +3345,59 @@ Deno.test("jetstream - pull consumer max_bytes rejected on old servers", async (

await cleanup(ns, nc);
});

Deno.test("jetstream - missed idleheartbeat on fetch", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const iter = js.fetch(stream, "me", {
expires: 2000,
idle_heartbeat: 250,
//@ts-ignore: testing
delay_heartbeat: true,
});

await assertRejects(
async () => {
for await (const _m of iter) {
// no message expected
}
},
NatsError,
"idle heartbeats missed",
);

await cleanup(ns, nc);
});

Deno.test("jetstream - idleheartbeat on fetch", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const iter = js.fetch(stream, "me", {
expires: 2000,
idle_heartbeat: 250,
});

await (async () => {
for await (const _m of iter) {
// no message expected
}
})();

await cleanup(ns, nc);
});

0 comments on commit e8600fa

Please sign in to comment.