Skip to content

Commit

Permalink
heartbeats
Browse files Browse the repository at this point in the history
[FEAT] added optional heartbeat checks to js#fetch() (#327)
[FEAT] added optional heartbeat checks to js#subcribe()
  • Loading branch information
aricart committed Sep 8, 2022
1 parent 9cc8c82 commit 6c3a205
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 6 deletions.
1 change: 1 addition & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum ErrorCode {
JetStream409MaxAckPendingExceeded = "409",
JetStream409 = "409",
JetStreamNotEnabled = "503",
JetStreamIdleHeartBeat = "IDLE_HEARTBEAT",

// emitted by the server
AuthorizationViolation = "AUTHORIZATION_VIOLATION",
Expand Down
139 changes: 139 additions & 0 deletions nats-base-client/idleheartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Called with the number of missed heartbeats.
* If the function returns true, the monitor will cancel monitoring.
*/
export type IdleHeartbeatFn = (n: number) => boolean;

/**
* IdleHeartbeatOptions
*/
export type IdleHeartbeatOptions = {
/**
* @field maxOut - optional maximum number of missed heartbeats before notifying (default is 2)
*/
maxOut: number;
/**
* @field cancelAfter - optional timer to auto cancel monitoring in millis
*/
cancelAfter: number;
};

export class IdleHeartbeat {
interval: number;
maxOut: number;
cancelAfter: number;
timer?: number;
autoCancelTimer?: number;
last!: number;
missed: number;
count: number;
callback: IdleHeartbeatFn;

/**
* Constructor
* @param interval in millis to check
* @param cb a callback to report when heartbeats are missed
* @param opts monitor options @see IdleHeartbeatOptions
*/
constructor(
interval: number,
cb: IdleHeartbeatFn,
opts: Partial<IdleHeartbeatOptions> = { maxOut: 2 },
) {
this.interval = interval;
this.maxOut = opts?.maxOut || 2;
this.cancelAfter = opts?.cancelAfter || 0;
this.last = Date.now();
this.missed = 0;
this.count = 0;
this.callback = cb;

this._schedule();
}

/**
* cancel monitoring
*/
cancel() {
if (this.autoCancelTimer) {
clearTimeout(this.autoCancelTimer);
}
if (this.timer) {
clearInterval(this.timer);
}
this.timer = 0;
this.autoCancelTimer = 0;
}

/**
* work signals that there was work performed
*/
work() {
this.last = Date.now();
this.missed = 0;
}

/**
* internal api to change the interval, cancelAfter and maxOut
* @param interval
* @param cancelAfter
* @param maxOut
*/
_change(interval: number, cancelAfter = 0, maxOut = 2) {
this.interval = interval;
this.maxOut = maxOut;
this.cancelAfter = cancelAfter;
this.restart();
}

/**
* cancels and restarts the monitoring
*/
restart() {
this.cancel();
this._schedule();
}

/**
* internal api called to start monitoring
*/
_schedule() {
if (this.cancelAfter > 0) {
// @ts-ignore: in node is not a number - we treat this opaquely
this.autoCancelTimer = setTimeout(() => {
this.cancel();
}, this.cancelAfter);
}
// @ts-ignore: in node is not a number - we treat this opaquely
this.timer = setInterval(() => {
this.count++;
if (Date.now() - this.interval > this.last) {
this.missed++;
}
if (this.missed >= this.maxOut) {
try {
if (this.callback(this.missed) === true) {
this.cancel();
}
} catch (err) {
console.log(err);
}
}
}, this.interval);
}
}
123 changes: 121 additions & 2 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import {
isFlowControlMsg,
isHeartbeatMsg,
isTerminal409,
Js409Errors,
millis,
nanos,
newJsErrorMsg,
validateDurableName,
validateStreamName,
} from "./jsutil.ts";
Expand All @@ -80,6 +83,7 @@ import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { Feature } from "./semver.ts";
import { ObjectStoreImpl } from "./objectstore.ts";
import { IdleHeartbeat } from "./idleheartbeat.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand Down Expand Up @@ -243,6 +247,7 @@ export class JetStreamClientImpl extends BaseApiClient
const trackBytes = (opts.max_bytes ?? 0) > 0;
let receivedBytes = 0;
const max_bytes = trackBytes ? opts.max_bytes! : 0;
let monitor: IdleHeartbeat | null = null;

const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
Expand All @@ -266,10 +271,27 @@ export class JetStreamClientImpl extends BaseApiClient
if (expires === 0 && args.no_wait === false) {
throw new Error("expires or no_wait is required");
}
const hb = opts.idle_heartbeat || 0;
if (hb) {
args.idle_heartbeat = nanos(hb);
//@ts-ignore: for testing
if (opts.delay_heartbeat === true) {
//@ts-ignore: test option
args.idle_heartbeat = nanos(hb * 4);
}
}

const qi = new QueuedIteratorImpl<JsMsg>();
const wants = args.batch;
let received = 0;
qi.protocolFilterFn = (jm, _ingest = false): boolean => {
const jsmi = jm as JsMsgImpl;
if (isHeartbeatMsg(jsmi.msg)) {
monitor?.work();
return false;
}
return true;
};
// FIXME: this looks weird, we want to stop the iterator
// but doing it from a dispatchedFn...
qi.dispatchedFn = (m: JsMsg | null) => {
Expand Down Expand Up @@ -311,6 +333,8 @@ export class JetStreamClientImpl extends BaseApiClient
qi.stop(err);
}
} else {
// if we are doing heartbeats, message resets
monitor?.work();
qi.received++;
qi.push(toJsMsg(msg));
}
Expand All @@ -327,16 +351,40 @@ export class JetStreamClientImpl extends BaseApiClient
sub.drain();
timer = null;
}
if (monitor) {
monitor.cancel();
}
});
}

(async () => {
try {
if (hb) {
monitor = new IdleHeartbeat(hb, (v: number): boolean => {
//@ts-ignore: pushing a fn
qi.push(() => {
// this will terminate the iterator
qi.err = new NatsError(
`${Js409Errors.IdleHeartbeatMissed}: ${v}`,
ErrorCode.JetStreamIdleHeartBeat,
);
});
return true;
});
}
} catch (_err) {
// ignore it
}

// close the iterator if the connection or subscription closes unexpectedly
await (sub as SubscriptionImpl).closed;
if (timer !== null) {
timer.cancel();
timer = null;
}
if (monitor) {
monitor.cancel();
}
qi.stop();
})().catch();

Expand Down Expand Up @@ -413,6 +461,9 @@ export class JetStreamClientImpl extends BaseApiClient
sub.unsubscribe();
throw err;
}

sub._maybeSetupHbMonitoring();

return sub;
}

Expand Down Expand Up @@ -594,10 +645,14 @@ export class JetStreamClientImpl extends BaseApiClient
return (jm: JsMsg | null, ctx?: unknown): IngestionFilterFnResult => {
// ctx is expected to be the iterator (the JetstreamSubscriptionImpl)
const jsub = ctx as JetStreamSubscriptionImpl;

// this shouldn't happen
if (!jm) return { ingest: false, protocol: false };

const jmi = jm as JsMsgImpl;
if (!checkJsError(jmi.msg)) {
jsub.monitor?.work();
}
if (isHeartbeatMsg(jmi.msg)) {
const ingest = ordered ? jsub._checkHbOrderConsumer(jmi.msg) : true;
if (!ordered) {
Expand All @@ -614,9 +669,10 @@ export class JetStreamClientImpl extends BaseApiClient
}
}

class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
implements JetStreamSubscriptionInfoable, Destroyable, ConsumerInfoable {
js: BaseApiClient;
monitor: IdleHeartbeat | null;

constructor(
js: BaseApiClient,
Expand All @@ -625,6 +681,13 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
) {
super(js.nc, subject, opts);
this.js = js;
this.monitor = null;

this.sub.closed.then(() => {
if (this.monitor) {
this.monitor.cancel();
}
});
}

set info(info: JetStreamSubscriptionInfo | null) {
Expand Down Expand Up @@ -667,6 +730,36 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
});
}

// this is called by push subscriptions, to initialize the monitoring
// if configured on the consumer
_maybeSetupHbMonitoring() {
const ns = this.info?.config?.idle_heartbeat || 0;
if (ns) {
this._setupHbMonitoring(millis(ns));
}
}

_setupHbMonitoring(millis: number, cancelAfter = 0) {
const opts = { cancelAfter: 0, maxOut: 2 };
if (cancelAfter) {
opts.cancelAfter = cancelAfter;
}
const sub = this.sub as SubscriptionImpl;
const handler = (v: number): boolean => {
const msg = newJsErrorMsg(
409,
`${Js409Errors.IdleHeartbeatMissed}: ${v}`,
this.sub.subject,
);
this.sub.callback(null, msg);
// if we are a handler, we'll continue reporting
// iterators will stop
return !sub.noIterator;
};
// this only applies for push subscriptions
this.monitor = new IdleHeartbeat(millis, handler, opts);
}

_checkHbOrderConsumer(msg: Msg): boolean {
const rm = msg.headers!.get(JsHeaders.ConsumerStalledHdr);
if (rm !== "") {
Expand Down Expand Up @@ -743,11 +836,37 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
args.max_bytes = opts.max_bytes!;
}

let expires = 0;
if (opts.expires && opts.expires > 0) {
args.expires = nanos(opts.expires);
expires = opts.expires;
args.expires = nanos(expires);
}

let hb = 0;
if (opts.idle_heartbeat && opts.idle_heartbeat > 0) {
hb = opts.idle_heartbeat;
args.idle_heartbeat = nanos(hb);
}

if (hb && expires === 0) {
throw new Error("idle_heartbeat requires expires");
}
if (hb > expires) {
throw new Error("expires must be greater than idle_heartbeat");
}

if (this.info) {
if (this.monitor) {
this.monitor.cancel();
}
if (expires && hb) {
if (!this.monitor) {
this._setupHbMonitoring(hb, expires);
} else {
this.monitor._change(hb, expires);
}
}

const api = (this.info.api as BaseApiClient);
const subj = `${api.prefix}.CONSUMER.MSG.NEXT.${stream}.${consumer}`;
const reply = this.sub.subject;
Expand Down

0 comments on commit 6c3a205

Please sign in to comment.