Skip to content

Commit

Permalink
[FEAT] added check when retrieving a key, but also specifying a seque…
Browse files Browse the repository at this point in the history
…nce (new feature), that the key matched. If not, null is returned as the sequence is not referencing the expected key.

[FIX] conversion from stored message captured the key as provided by the client, and returned an entry with the said key.
  • Loading branch information
aricart committed May 16, 2022
1 parent 4af5a1b commit 6bb7332
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
20 changes: 12 additions & 8 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ export class Bucket implements KV, KvRemove {
return data.length;
}

smToEntry(key: string, sm: StoredMsg): KvEntry {
smToEntry(sm: StoredMsg): KvEntry {
return {
bucket: this.bucket,
key: key,
key: sm.subject.substring(this.prefixLen),
value: sm.data,
delta: 0,
created: sm.time,
Expand All @@ -325,7 +325,7 @@ export class Bucket implements KV, KvRemove {
};
}

jmToEntry(_k: string, jm: JsMsg): KvEntry {
jmToEntry(jm: JsMsg): KvEntry {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
return {
bucket: this.bucket,
Expand Down Expand Up @@ -370,19 +370,23 @@ export class Bucket implements KV, KvRemove {

async get(
k: string,
opts: { revision: number } = { revision: 0 },
opts?: { revision: number },
): Promise<KvEntry | null> {
const ek = this.encodeKey(k);
this.validateKey(ek);

let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) };
if (opts.revision !== 0) {
if (opts && opts.revision > 0) {
arg = { seq: opts.revision };
}

try {
const sm = await this.jsm.streams.getMessage(this.bucketName(), arg);
return this.smToEntry(k, sm);
const ke = this.smToEntry(sm);
if (ke.key !== ek) {
return null;
}
return ke;
} catch (err) {
if (err.message === "no message found") {
return null;
Expand Down Expand Up @@ -476,7 +480,7 @@ export class Bucket implements KV, KvRemove {
return;
}
if (jm) {
const e = this.jmToEntry(k, jm);
const e = this.jmToEntry(jm);
qi.push(e);
qi.received++;
//@ts-ignore - function will be removed
Expand Down Expand Up @@ -553,7 +557,7 @@ export class Bucket implements KV, KvRemove {
return;
}
if (jm) {
const e = this.jmToEntry(k, jm);
const e = this.jmToEntry(jm);
qi.push(e);
qi.received++;

Expand Down
26 changes: 15 additions & 11 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1279,27 +1279,31 @@ Deno.test("kv - get revision", async () => {
const js = nc.jetstream();
const sc = StringCodec();

const b = await js.views.kv("a", { history: 3 }) as Bucket;
await b.put("A", sc.encode("a"));
await b.put("A", sc.encode("b"));
await b.put("A", sc.encode("c"));
const b = await js.views.kv(nuid.next(), { history: 3 }) as Bucket;

async function check(value: string | null, revision = 0) {
const e = await b.get("A", { revision });
async function check(key: string, value: string | null, revision = 0) {
const e = await b.get(key, { revision });
if (value === null) {
assertEquals(e, null);
} else {
assertEquals(sc.decode(e!.value), value);
}
}

await check("c");
await check("a", 1);
await check("b", 2);
await b.put("A", sc.encode("a"));
await b.put("A", sc.encode("b"));
await b.put("A", sc.encode("c"));

// expect null, as sequence 1, holds "A"
await check("B", null, 1);

await check("A", "c");
await check("A", "a", 1);
await check("A", "b", 2);

await b.put("A", sc.encode("d"));
await check("d");
await check(null, 1);
await check("A", "d");
await check("A", null, 1);

await cleanup(ns, nc);
});

0 comments on commit 6bb7332

Please sign in to comment.