Skip to content

Commit

Permalink
reworked handling of 409
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 8, 2022
1 parent 88ae3b1 commit 51ca41a
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 150 deletions.
37 changes: 17 additions & 20 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import {
checkJsError,
isFlowControlMsg,
isHeartbeatMsg,
isTerminal409,
nanos,
validateDurableName,
validateStreamName,
Expand Down Expand Up @@ -270,17 +271,7 @@ export class JetStreamClientImpl extends BaseApiClient
timer = null;
}
if (isNatsError(err)) {
switch (err.code) {
case ErrorCode.JetStream404NoMessages:
case ErrorCode.JetStream409:
qi.stop();
return [null, null];
case ErrorCode.JetStream408RequestTimeout:
if ("message size exceeds maxbytes" === err.message) {
qi.stop(err);
}
qi.stop();
}
qi.stop(hideNonTerminalJsErrors(err) === null ? undefined : err);
} else {
qi.stop(err);
}
Expand Down Expand Up @@ -774,26 +765,32 @@ function iterMsgAdapter(
if (err) {
return [err, null];
}

// iterator will close if we have an error
// check for errors that shouldn't close it
const ne = checkJsError(msg);
if (ne !== null) {
return [hideNonTerminalJsErrors(ne), null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg)];
}

function hideNonTerminalJsErrors(ne: NatsError): NatsError | null {
if (ne !== null) {
switch (ne.code) {
case ErrorCode.JetStream404NoMessages:
case ErrorCode.JetStream409:
return [null, null];
case ErrorCode.JetStream408RequestTimeout:
if ("message size exceeds maxbytes" === ne.message) {
return [ne, null];
return null;
case ErrorCode.JetStream409:
if (isTerminal409(ne)) {
return ne;
}
return [null, null];
return null;
default:
return [ne, null];
return ne;
}
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg)];
return null;
}

function autoAckJsMsg(data: JsMsg | null) {
Expand Down
38 changes: 37 additions & 1 deletion nats-base-client/jsutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,40 @@ export function checkJsError(msg: Msg): NatsError | null {
return checkJsErrorCode(h.code, h.description);
}

export enum Js409Errors {
MaxBatchExceeded = "exceeded maxrequestbatch of",
MaxExpiresExceeded = "exceeded maxrequestexpires of",
MaxBytesExceeded = "exceeded maxrequestmaxbytes of",
MaxMessageSizeExceeded = "message size exceeds maxbytes",
PushConsumer = "consumer is push based",
MaxWaitingExceeded = "exceeded maxwaiting", // not terminal
}

let MAX_WAITING_FAIL = false;
export function setMaxWaitingToFail(tf: boolean) {
MAX_WAITING_FAIL = tf;
}

export function isTerminal409(err: NatsError): boolean {
if (err.code !== ErrorCode.JetStream409) {
return false;
}
const fatal = [
Js409Errors.MaxBatchExceeded,
Js409Errors.MaxExpiresExceeded,
Js409Errors.MaxBytesExceeded,
Js409Errors.MaxMessageSizeExceeded,
Js409Errors.PushConsumer,
];
if (MAX_WAITING_FAIL) {
fatal.push(Js409Errors.MaxWaitingExceeded);
}

return fatal.find((s) => {
return err.message.indexOf(s) !== -1;
}) !== undefined;
}

export function checkJsErrorCode(
code: number,
description = "",
Expand All @@ -108,7 +142,9 @@ export function checkJsErrorCode(
case 408:
return new NatsError(description, ErrorCode.JetStream408RequestTimeout);
case 409:
// the description can be exceeded max waiting or max ack pending
// the description can be exceeded max waiting or max ack pending, which are
// recoverable, but can also be terminal errors where the request exceeds
// some value in the consumer configuration
return new NatsError(
description,
ErrorCode.JetStream409,
Expand Down
263 changes: 263 additions & 0 deletions tests/jetream409_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import {
AckPolicy,
JetStreamClient,
PullOptions,
} from "../nats-base-client/types.ts";
import {
Js409Errors,
nanos,
setMaxWaitingToFail,
} from "../nats-base-client/jsutil.ts";
import {
consumerOpts,
deferred,
NatsError,
StringCodec,
} from "../nats-base-client/mod.ts";
import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { assertStringIncludes } from "https://deno.land/std@0.75.0/testing/asserts.ts";
import {
cleanup,
initStream,
jetstreamServerConf,
setup,
} from "./jstest_util.ts";
import { notCompatible } from "./helpers/mod.ts";

type testArgs = {
js: JetStreamClient;
stream: string;
durable: string;
opts: PullOptions;
expected: Js409Errors;
};

async function expectFetchError(args: testArgs) {
const { js, stream, durable, opts, expected } = args;
const i = js.fetch(stream, durable, opts);
await assertRejects(
async () => {
for await (const _m of i) {
//nothing
}
},
Error,
expected,
);
}

async function expectPullSubscribeIteratorError(args: testArgs) {
const { js, stream, durable, opts, expected } = args;
const co = consumerOpts();
co.bind(stream, durable);
const sub = await js.pullSubscribe(">", co);
sub.pull(opts);

await assertRejects(
async () => {
for await (const _m of sub) {
// nothing
}
},
Error,
expected,
);
}

async function expectPullSubscribeCallbackError(
args: testArgs,
) {
const { js, stream, durable, opts, expected } = args;

const d = deferred<NatsError | null>();
const co = consumerOpts();
co.bind(stream, durable);
co.callback((err) => {
d.resolve(err);
});
const sub = await js.pullSubscribe(">", co);
sub.pull(opts);
const ne = await d;
assertStringIncludes(ne?.message || "", expected);
}

Deno.test("409 - max_batch", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

const sc = StringCodec();
const js = nc.jetstream();
for (let i = 0; i < 10; i++) {
await js.publish(subj, sc.encode("hello"));
}

await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
max_batch: 1,
});

const opts = { batch: 10, expires: 1000 } as PullOptions;
const to = {
js,
stream,
durable: "a",
opts,
expected: Js409Errors.MaxBatchExceeded,
};

await expectFetchError(to);
await expectPullSubscribeIteratorError(to);
await expectPullSubscribeCallbackError(to);

await cleanup(ns, nc);
});

Deno.test("409 - max_expires", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

const sc = StringCodec();
const js = nc.jetstream();
for (let i = 0; i < 10; i++) {
await js.publish(subj, sc.encode("hello"));
}

await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
max_expires: nanos(1000),
});

const opts = { batch: 1, expires: 5000 } as PullOptions;
const to = {
js,
stream,
durable: "a",
opts,
expected: Js409Errors.MaxExpiresExceeded,
};

await expectFetchError(to);
await expectPullSubscribeIteratorError(to);
await expectPullSubscribeCallbackError(to);

await cleanup(ns, nc);
});

Deno.test("409 - max_bytes", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.8.3")) {
return;
}
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

const sc = StringCodec();
const js = nc.jetstream();
for (let i = 0; i < 10; i++) {
await js.publish(subj, sc.encode("hello"));
}

await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
max_bytes: 10,
});

const opts = { max_bytes: 1024, expires: 5000 } as PullOptions;
const to = {
js,
stream,
durable: "a",
opts,
expected: Js409Errors.MaxBytesExceeded,
};

await expectFetchError(to);
await expectPullSubscribeIteratorError(to);
await expectPullSubscribeCallbackError(to);

await cleanup(ns, nc);
});

Deno.test("409 - max msg size", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

const sc = StringCodec();
const js = nc.jetstream();
for (let i = 0; i < 10; i++) {
await js.publish(subj, sc.encode("hello"));
}

await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
});

const opts = { max_bytes: 2, expires: 5000 } as PullOptions;
const to = {
js,
stream,
durable: "a",
opts,
expected: Js409Errors.MaxMessageSizeExceeded,
};

await expectFetchError(to);
await expectPullSubscribeIteratorError(to);
await expectPullSubscribeCallbackError(to);

await cleanup(ns, nc);
});

Deno.test("409 - max waiting", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();

const sc = StringCodec();
const js = nc.jetstream();
for (let i = 0; i < 10; i++) {
await js.publish(subj, sc.encode("hello"));
}

await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
max_waiting: 1,
});

const opts = { expires: 1000 } as PullOptions;
const to = {
js,
stream,
durable: "a",
opts,
expected: Js409Errors.MaxWaitingExceeded,
};

const iter = js.fetch(stream, "a", { batch: 1000, expires: 5000 });
(async () => {
for await (const _m of iter) {
// nothing
}
})().then();

setMaxWaitingToFail(true);

await expectFetchError(to);
await expectPullSubscribeIteratorError(to);
await expectPullSubscribeCallbackError(to);

await cleanup(ns, nc);
});
Loading

0 comments on commit 51ca41a

Please sign in to comment.