Skip to content

Commit

Permalink
[DOC] [JS] heatbeat example was not up-to-date, updated example showi…
Browse files Browse the repository at this point in the history
…ng behaviour of missed heartbeats on JS push subscriber. (#446)
  • Loading branch information
aricart committed Dec 27, 2022
1 parent d8e555b commit 4257f3c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 79 deletions.
78 changes: 35 additions & 43 deletions examples/jetstream/10_heartbeats.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
AckPolicy,
connect,
isHeartbeatMsg,
JsHeaders,
nanos,
nuid,
toJsMsg,
} from "../../src/mod.ts";
import { connect, consumerOpts, nuid } from "../../src/mod.ts";

const nc = await connect();
const jsm = await nc.jetstreamManager();
Expand All @@ -15,41 +7,41 @@ const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });

// create a regular subscription (this is an ephemeral consumer, so start the sub)
let missed = 0;
const sub = nc.subscribe("my.messages", {
callback: (_err, msg) => {
missed = 0;
// simply checking if has headers and code === 100, with no reply
// subject set. if it has a reply it would be a flow control message
// which will get acknowledged at the end.
if (isHeartbeatMsg(msg)) {
// the heartbeat has additional information:
const lastSeq = msg.headers?.get(JsHeaders.LastStreamSeqHdr);
const consSeq = msg.headers?.get(JsHeaders.LastConsumerSeqHdr);
console.log(
`alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`,
);
return;
}
// do something with the message
const m = toJsMsg(msg);
const js = nc.jetstream();
let opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("iter-dur");
const iter = await js.subscribe(`${subj}.>`, opts);
// if 2 heartbeats are missed, the iterator will end with an error
// simply re-do the js.subscribe() and attempt again
const done = (async () => {
for await (const m of iter) {
m.ack();
},
});

setInterval(() => {
missed++;
if (missed > 3) {
console.error("JetStream stopped sending heartbeats!");
}
}, 30000);

// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
idle_heartbeat: nanos(10000),
})();
done.catch((err) => {
console.log(`iterator closed: ${err}`);
});

await sub.closed;
opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("callback-dur")
.callback((err, m) => {
if (err) {
// the callback will also report a heartbeat error, however because the
// callback can receive errors, it continues active. If the server returns
// the client will automatically resume receiving messages
console.log(err);
} else {
m?.ack();
}
});

const sub = await js.subscribe(`${subj}.>`, opts);
await sub.closed.then(() => console.log("sub closed"));
74 changes: 38 additions & 36 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,47 +426,49 @@ By creating a consumer that enables heartbeats, you can request JetStream to
send you heartbeat messages every so often. This way your client can reconcile
if the lack of messages means that you should be restarting your consumer.

Currently, the library doesn't provide a notification for missed heartbeats, but
this is not too difficult to do:

```typescript
let missed = 0;
// this is a plain nats subscription
const sub = nc.subscribe("my.messages", {
callback: (err, msg) => {
// if we got a message, we simply reset
missed = 0;
// simply checking if has headers and code === 100 and a description === "Idle Heartbeat"
if (isHeartbeatMsg(msg)) {
// the heartbeat has additional information:
const lastSeq = msg.headers.get(JsHeaders.LastStreamSeqHdr);
const consSeq = msg.headers.get(JsHeaders.LastConsumerSeqHdr);
console.log(
`alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`,
);
return;
}
// do something with the message
const m = toJsMsg(msg);
m.ack();
},
});
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });

setInterval(() => {
missed++;
if (missed > 3) {
console.error("JetStream stopped sending heartbeats!");
const js = nc.jetstream();
let opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("iter-dur");
const iter = await js.subscribe(`${subj}.>`, opts);
// if 2 heartbeats are missed, the iterator will end with an error
// simply re-do the js.subscribe() and attempt again
const done = (async () => {
for await (const m of iter) {
m.ack();
}
}, 30000);

// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
idle_heartbeat: nanos(10000),
})();
done.catch((err) => {
console.log(`iterator closed: ${err}`);
});

await sub.closed;
opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("callback-dur")
.callback((err, m) => {
if (err) {
// the callback will also report a heartbeat error, however because the
// callback can receive errors, it continues active. If the server returns
// the client will automatically resume receiving messages
console.log(err);
} else {
m?.ack();
}
});

const sub = await js.subscribe(`${subj}.>`, opts);
await sub.closed.then(() => console.log("sub closed"));
```

#### JetStream Ordered Consumers
Expand Down

0 comments on commit 4257f3c

Please sign in to comment.