Skip to content

Commit

Permalink
[CHANGE] DeliveryInfo now has 12 or more tokens.
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Aug 19, 2021
1 parent 4bd4c5d commit 6123a4d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 39 deletions.
31 changes: 23 additions & 8 deletions nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,33 @@ export function toJsMsg(m: Msg): JsMsg {

export function parseInfo(s: string): DeliveryInfo {
const tokens = s.split(".");
if (tokens.length !== 9 && tokens[0] !== "$JS" && tokens[1] !== "ACK") {
if (tokens.length === 9) {
tokens.splice(2, 0, "_", "");
tokens.push("");
}

if (
(tokens.length < 12) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
) {
throw new Error(`not js message`);
}

// old
// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
// new
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
const di = {} as DeliveryInfo;
di.stream = tokens[2];
di.consumer = tokens[3];
di.redeliveryCount = parseInt(tokens[4], 10);
di.streamSequence = parseInt(tokens[5], 10);
di.deliverySequence = parseInt(tokens[6], 10);
di.timestampNanos = parseInt(tokens[7], 10);
di.pending = parseInt(tokens[8], 10);
// if domain is "_", replace with blank
di.domain = tokens[2] === "_" ? "" : tokens[2];
di.account_hash = tokens[3];
di.stream = tokens[4];
di.consumer = tokens[5];
di.redeliveryCount = parseInt(tokens[6], 10);
di.streamSequence = parseInt(tokens[7], 10);
di.deliverySequence = parseInt(tokens[8], 10);
di.timestampNanos = parseInt(tokens[9], 10);
di.pending = parseInt(tokens[10], 10);
di.rand = tokens[11];
return di;
}

Expand Down
3 changes: 3 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ export interface JsMsg {
}

export interface DeliveryInfo {
domain: string;
account_hash?: string;
stream: string;
consumer: string;
redeliveryCount: number;
Expand All @@ -424,6 +426,7 @@ export interface DeliveryInfo {
timestampNanos: number;
pending: number;
redelivered: boolean;
rand?: string;
}

export interface StoredMsg {
Expand Down
55 changes: 24 additions & 31 deletions tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,34 @@ import {
Msg,
StringCodec,
} from "../src/mod.ts";
import { deferred } from "../nats-base-client/util.ts";
import { nanos } from "../nats-base-client/jsutil.ts";
import { parseInfo, toJsMsg } from "../nats-base-client/jsmsg.ts";

Deno.test("jsmsg - parse", async () => {
const nc = await connect({ servers: "demo.nats.io" });
const subj = createInbox();
const m = deferred<Msg>();
const sub = nc.subscribe(subj, {
max: 1,
callback: (_err, msg) => {
m.resolve(msg);
},
});

// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
const rs = `MY.TEST.streamname.consumername.2.3.4.${nanos(Date.now())}.100`;
const h = headers();
h.set("hello", "world");
nc.publish(subj, Empty, { reply: rs, headers: h });
const msg = await m;
const jm = toJsMsg(msg);

assertEquals(jm.info.stream, "streamname");
assertEquals(jm.info.consumer, "consumername");
assertEquals(jm.info.redeliveryCount, 2);
assertEquals(jm.redelivered, true);
assertEquals(jm.seq, 3);
assertEquals(jm.info.pending, 100);
assertEquals(jm.sid, sub.getID());

const h2 = jm.headers;
assertEquals(h2!.get("hello"), "world");
const rs = `$JS.ACK.streamname.consumername.2.3.4.${nanos(Date.now())}.100`;
const info = parseInfo(rs);
assertEquals(info.stream, "streamname");
assertEquals(info.consumer, "consumername");
assertEquals(info.redeliveryCount, 2);
assertEquals(info.streamSequence, 3);
assertEquals(info.pending, 100);
});

await nc.close();
Deno.test("jsmsg - parse long", async () => {
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
const rs = `$JS.ACK.domain.account.streamname.consumername.2.3.4.${
nanos(Date.now())
}.100.rand`;
const info = parseInfo(rs);
assertEquals(info.domain, "domain");
assertEquals(info.account_hash, "account");
assertEquals(info.stream, "streamname");
assertEquals(info.consumer, "consumername");
assertEquals(info.redeliveryCount, 2);
assertEquals(info.streamSequence, 3);
assertEquals(info.pending, 100);
assertEquals(info.rand, "rand");
});

Deno.test("jsmsg - parse rejects subject is not 9 tokens", () => {
Expand All @@ -75,9 +68,9 @@ Deno.test("jsmsg - parse rejects subject is not 9 tokens", () => {
}
};

const chunks = `$JS.ACK.stream.consumer.1.2.3.4.5`.split(".");
const chunks = `$JS.ACK.stream.consumer.1.2.3.4.5.6.7.8.9.10`.split(".");
for (let i = 1; i <= chunks.length; i++) {
fn(chunks.slice(0, i).join("."), i === 9);
fn(chunks.slice(0, i).join("."), i === 9 || i >= 12);
}
});

Expand Down

0 comments on commit 6123a4d

Please sign in to comment.