diff --git a/jetstream/jsclient.ts b/jetstream/jsclient.ts index f5c681f2..728c7f51 100644 --- a/jetstream/jsclient.ts +++ b/jetstream/jsclient.ts @@ -98,6 +98,7 @@ import { PullOptions, ReplayPolicy, } from "./jsapi_types.ts"; +import { nuid } from "../nats-base-client/nuid.ts"; export enum PubHeaders { MsgIdHdr = "Nats-Msg-Id", @@ -775,6 +776,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription const nci = this.js.nc; nci._resub(this.sub, newDeliver); const info = this.info; + info.config.name = nuid.next(); info.ordered_consumer_sequence.delivery_seq = 0; info.flow_control.heartbeat_count = 0; info.flow_control.fc_count = 0; @@ -842,6 +844,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription // reset the consumer const seq = this.info?.ordered_consumer_sequence?.stream_seq || 0; this._resetOrderedConsumer(seq + 1); + this.monitor?.restart(); // if we are ordered, we will reset the consumer and keep // feeding the iterator or callback - we are not stopping return false; diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index 4d63daf7..1d655e60 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -2134,3 +2134,33 @@ Deno.test("kv - honors checkAPI option", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - watcher on server restart", async () => { + let { ns, nc } = await setup( + jetstreamServerConf({}), + ); + const js = nc.jetstream(); + const kv = await js.views.kv("A"); + const iter = await kv.watch(); + const d = deferred(); + (async () => { + for await (const e of iter) { + d.resolve(e); + break; + } + })().then(); + + ns = await ns.restart(); + console.log("server restarted"); + for (let i = 0; i < 10; i++) { + try { + await kv.put("hello", "world"); + break; + } catch { + await delay(500); + } + } + + await d; + await cleanup(ns, nc); +}); diff --git a/nats-base-client/idleheartbeat_monitor.ts b/nats-base-client/idleheartbeat_monitor.ts index 84d5bc4f..adba34c5 100644 --- a/nats-base-client/idleheartbeat_monitor.ts +++ b/nats-base-client/idleheartbeat_monitor.ts @@ -78,6 +78,7 @@ export class IdleHeartbeatMonitor { } this.timer = 0; this.autoCancelTimer = 0; + this.missed = 0; } /** diff --git a/src/deno_transport.ts b/src/deno_transport.ts index 697bcd5f..560b9494 100644 --- a/src/deno_transport.ts +++ b/src/deno_transport.ts @@ -157,7 +157,7 @@ export class DenoTransport implements Transport { const sto = await (this.loadTlsOptions(hostname)); this.conn = await Deno.startTls( //@ts-ignore: just the conn - this.conn, + this.conn as Deno.TcpConn, sto, ); // this is necessary because the startTls process doesn't