Skip to content

Commit

Permalink
[FEAT] implements the ability in KV to request a specific revision fo…
Browse files Browse the repository at this point in the history
…r a key

FIX #301
  • Loading branch information
aricart committed May 16, 2022
1 parent a148e7e commit 4af5a1b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
16 changes: 12 additions & 4 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
KvPutOptions,
KvRemove,
KvStatus,
MsgRequest,
PurgeOpts,
PurgeResponse,
RetentionPolicy,
Expand Down Expand Up @@ -367,13 +368,20 @@ export class Bucket implements KV, KvRemove {
return pa.seq;
}

async get(k: string): Promise<KvEntry | null> {
async get(
k: string,
opts: { revision: number } = { revision: 0 },
): Promise<KvEntry | null> {
const ek = this.encodeKey(k);
this.validateKey(ek);

let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) };
if (opts.revision !== 0) {
arg = { seq: opts.revision };
}

try {
const sm = await this.jsm.streams.getMessage(this.bucketName(), {
last_by_subj: this.fullKeyName(ek),
});
const sm = await this.jsm.streams.getMessage(this.bucketName(), arg);
return this.smToEntry(k, sm);
} catch (err) {
if (err.message === "no message found") {
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ export interface KvRemove {
}

export interface RoKV {
get(k: string): Promise<KvEntry | null>;
get(k: string, opts?: { revision: number }): Promise<KvEntry | null>;
history(opts?: { key?: string }): Promise<QueuedIterator<KvEntry>>;
watch(
opts?: { key?: string; headers_only?: boolean; initializedFn?: callbackFn },
Expand Down
3 changes: 0 additions & 3 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import {
JsMsg,
JsMsgCallback,
JSONCodec,
millis,
Nanos,
nanos,
NatsConnectionImpl,
NatsError,
Expand All @@ -51,7 +49,6 @@ import {
StringCodec,
} from "../nats-base-client/internal_mod.ts";
import {
assertArrayIncludes,
assertEquals,
assertRejects,
assertThrows,
Expand Down
32 changes: 32 additions & 0 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1271,3 +1271,35 @@ Deno.test("kv - watch init callback exceptions terminate the iterator", async ()
assertEquals(err.message, "crash");
await cleanup(ns, nc);
});

Deno.test("kv - get revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
const js = nc.jetstream();
const sc = StringCodec();

const b = await js.views.kv("a", { history: 3 }) as Bucket;
await b.put("A", sc.encode("a"));
await b.put("A", sc.encode("b"));
await b.put("A", sc.encode("c"));

async function check(value: string | null, revision = 0) {
const e = await b.get("A", { revision });
if (value === null) {
assertEquals(e, null);
} else {
assertEquals(sc.decode(e!.value), value);
}
}

await check("c");
await check("a", 1);
await check("b", 2);

await b.put("A", sc.encode("d"));
await check("d");
await check(null, 1);

await cleanup(ns, nc);
});

0 comments on commit 4af5a1b

Please sign in to comment.