Skip to content

Commit

Permalink
[CHANGE] [JS] ordered consumers now default to memory storage and no …
Browse files Browse the repository at this point in the history
…replicas

[CHANGE] [JS] ordered consumers now recreate the consumer if heartbeats are missed. Note that the client will not get a notification since the client will try to recover. This is also a change for iterators, since the ordered consumer iterator, will now resume when the consumer
  is recreated. For non-ordered consumers, any error will be notified on the callback until the subcription ends. Iterator based consumers will stop with the missed heartbeat error.
[FIX] [JS] publishing retries now properly handle timeouts
  • Loading branch information
aricart committed Sep 29, 2022
1 parent 8426af1 commit c3a0373
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 8 deletions.
41 changes: 36 additions & 5 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
ConsumerInfo,
ConsumerInfoable,
ConsumerOpts,
CreateConsumerRequest,
DeliverPolicy,
Destroyable,
Empty,
Expand Down Expand Up @@ -182,7 +183,10 @@ export class JetStreamClientImpl extends BaseApiClient
break;
} catch (err) {
const ne = err as NatsError;
if (ne.code === "503" && i + 1 < retries) {
if (
(ne.code === "503" || ne.code === ErrorCode.Timeout) &&
i + 1 < retries
) {
await delay(retry_delay);
} else {
throw err;
Expand Down Expand Up @@ -524,6 +528,8 @@ export class JetStreamClientImpl extends BaseApiClient
jsi.config.flow_control = true;
jsi.config.idle_heartbeat = jsi.config.idle_heartbeat || nanos(5000);
jsi.config.ack_wait = nanos(22 * 60 * 60 * 1000);
jsi.config.mem_storage = true;
jsi.config.num_replicas = 1;
}

if (jsi.config.ack_policy === AckPolicy.NotSet) {
Expand Down Expand Up @@ -714,10 +720,19 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
info.config.deliver_subject = newDeliver;
info.config.deliver_policy = DeliverPolicy.StartSequence;
info.config.opt_start_seq = sseq;
// put the stream name
const req = {} as CreateConsumerRequest;
req.stream_name = this.info.stream;
req.config = info.config;

const subj = `${info.api.prefix}.CONSUMER.CREATE.${info.stream}`;

this.js._request(subj, this.info.config)
this.js._request(subj, req)
.then((v) => {
const ci = v as ConsumerInfo;
this.info!.config = ci.config;
this.info!.name = ci.name;
})
.catch((err) => {
// to inform the subscription we inject an error this will
// be at after the last message if using an iterator.
Expand Down Expand Up @@ -751,9 +766,25 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
`${Js409Errors.IdleHeartbeatMissed}: ${v}`,
this.sub.subject,
);
this.sub.callback(null, msg);
// if we are a handler, we'll continue reporting
// iterators will stop
const ordered = this.info?.ordered;
// non-ordered consumers are always notified of the condition
// as they need to try and recover
if (!ordered) {
this.sub.callback(null, msg);
} else {
if (!this.js.nc.protocol.connected) {
// we are not connected don't do anything
return false;
}
// reset the consumer
const seq = this.info?.ordered_consumer_sequence?.delivery_seq;
this._resetOrderedConsumer(seq || 1);
// if we are ordered, we will reset the consumer and keep
// feeding the iterator or callback - we are not stopping
return false;
}
// let the hb monitor know if we are stopping for callbacks
// we don't as we deliver the errors via the cb.
return !sub.noIterator;
};
// this only applies for push subscriptions
Expand Down
13 changes: 13 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,19 @@ export interface JetStreamPublishOptions {
*/
lastSubjectSequence: number;
}>;

/**
* Number of publish retries. When set, any publishing failure due to a timeout
* or a no responders error (jetstream not available), will continue to
* retry the publish operation upto the number of retries specified.
*/
retries?: number;

/**
* Amount of millis to wait between republishing {@see retries}.
* Default is 250 millis.
*/
retry_delay?: number;
}

/**
Expand Down
40 changes: 38 additions & 2 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3592,7 +3592,6 @@ Deno.test("jetstream - repub on 503", async () => {
await delay(1000);

await js.publish(subj, Empty, {
//@ts-ignore: testing
retries: 15,
retry_delay: 1000,
timeout: 15000,
Expand All @@ -3612,7 +3611,6 @@ Deno.test("jetstream - duplicate message pub", async () => {

ack = await js.publish(subj, Empty, { msgID: "x" });
assertEquals(ack.duplicate, true);
console.log(ack);

await cleanup(ns, nc);
});
Expand Down Expand Up @@ -3784,3 +3782,41 @@ Deno.test("jetstream - kv and object store views reject in older servers", async

await cleanup(ns, nc);
});

Deno.test("jetstream - ordered consumer reset", async () => {
let { ns, nc } = await setup(jetstreamServerConf({}));
const { subj } = await initStream(nc, "A");
const d = deferred<JsMsg>();
const js = nc.jetstream();
const opts = consumerOpts();
opts.orderedConsumer();
opts.callback((err, m) => {
if (err) {
fail(err.message);
}
c.unsubscribe();
d.resolve(m!);
});
const c = await js.subscribe(subj, opts);

// stop the server and wait until hbs are missed
await ns.stop();
while (true) {
const missed = (c as JetStreamSubscriptionImpl).monitor?.missed || 0;
const connected = (nc as NatsConnectionImpl).protocol.connected;
// we want to wait until after 2 because we want to have a cycle
// where we try to recreate the consumer, but skip it because we are
// not connected
if (!connected && missed >= 3) {
break;
}
await delay(1000);
}
ns = await ns.restart();

const ack = await js.publish(subj, Empty, { retries: 20 });
await c.closed;

assertEquals((await d).seq, ack.seq);
await cleanup(ns, nc);
});
1 change: 0 additions & 1 deletion tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,6 @@ Deno.test("jsm - discard_new_per_subject option", async () => {
await kv.put("B", Empty);
await assertRejects(
async () => {
(nc as NatsConnectionImpl).options.debug = true;
await kv.put("B", Empty);
},
Error,
Expand Down

0 comments on commit c3a0373

Please sign in to comment.