diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index 404dda34..4312f518 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -35,6 +35,7 @@ import { KvPutOptions, KvRemove, KvStatus, + MsgRequest, PurgeOpts, PurgeResponse, RetentionPolicy, @@ -367,13 +368,20 @@ export class Bucket implements KV, KvRemove { return pa.seq; } - async get(k: string): Promise { + async get( + k: string, + opts: { revision: number } = { revision: 0 }, + ): Promise { const ek = this.encodeKey(k); this.validateKey(ek); + + let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) }; + if (opts.revision !== 0) { + arg = { seq: opts.revision }; + } + try { - const sm = await this.jsm.streams.getMessage(this.bucketName(), { - last_by_subj: this.fullKeyName(ek), - }); + const sm = await this.jsm.streams.getMessage(this.bucketName(), arg); return this.smToEntry(k, sm); } catch (err) { if (err.message === "no message found") { diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 004b83bc..e2dd2e63 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -914,7 +914,7 @@ export interface KvRemove { } export interface RoKV { - get(k: string): Promise; + get(k: string, opts?: { revision: number }): Promise; history(opts?: { key?: string }): Promise>; watch( opts?: { key?: string; headers_only?: boolean; initializedFn?: callbackFn }, diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 5632cba8..b84e3e0c 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -39,8 +39,6 @@ import { JsMsg, JsMsgCallback, JSONCodec, - millis, - Nanos, nanos, NatsConnectionImpl, NatsError, @@ -51,7 +49,6 @@ import { StringCodec, } from "../nats-base-client/internal_mod.ts"; import { - assertArrayIncludes, assertEquals, assertRejects, assertThrows, diff --git a/tests/kv_test.ts b/tests/kv_test.ts index cc8d8385..469a14db 100644 --- a/tests/kv_test.ts +++ b/tests/kv_test.ts @@ -1271,3 +1271,35 @@ Deno.test("kv - watch init callback exceptions terminate the iterator", async () assertEquals(err.message, "crash"); await cleanup(ns, nc); }); + +Deno.test("kv - get revision", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + const js = nc.jetstream(); + const sc = StringCodec(); + + const b = await js.views.kv("a", { history: 3 }) as Bucket; + await b.put("A", sc.encode("a")); + await b.put("A", sc.encode("b")); + await b.put("A", sc.encode("c")); + + async function check(value: string | null, revision = 0) { + const e = await b.get("A", { revision }); + if (value === null) { + assertEquals(e, null); + } else { + assertEquals(sc.decode(e!.value), value); + } + } + + await check("c"); + await check("a", 1); + await check("b", 2); + + await b.put("A", sc.encode("d")); + await check("d"); + await check(null, 1); + + await cleanup(ns, nc); +});