Skip to content

Commit

Permalink
[FEAT] kv store can now put as payloads directly
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 15, 2023
1 parent 3fccb5d commit 98e5a8b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
7 changes: 4 additions & 3 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MsgHdrs,
NatsConnection,
NatsError,
Payload,
} from "../nats-base-client/core.ts";
import { millis, nanos } from "./jsutil.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
Expand Down Expand Up @@ -435,7 +436,7 @@ export class Bucket implements KV, KvRemove {
return new KvJsMsgEntryImpl(this.bucket, key, jm);
}

async create(k: string, data: Uint8Array): Promise<number> {
async create(k: string, data: Payload): Promise<number> {
let firstErr;
try {
const n = await this.put(k, data, { previousSeq: 0 });
Expand All @@ -460,7 +461,7 @@ export class Bucket implements KV, KvRemove {
}
}

update(k: string, data: Uint8Array, version: number): Promise<number> {
update(k: string, data: Payload, version: number): Promise<number> {
if (version <= 0) {
throw new Error("version must be greater than 0");
}
Expand All @@ -469,7 +470,7 @@ export class Bucket implements KV, KvRemove {

async put(
k: string,
data: Uint8Array,
data: Payload,
opts: Partial<KvPutOptions> = {},
): Promise<number> {
const ek = this.encodeKey(k);
Expand Down
24 changes: 24 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
assert,
assertArrayIncludes,
assertEquals,
assertExists,
assertRejects,
assertThrows,
} from "https://deno.land/std@0.190.0/testing/asserts.ts";
Expand Down Expand Up @@ -1743,3 +1744,26 @@ Deno.test("kv - create after delete", async () => {
await kv.create("a", Empty);
await cleanup(ns, nc);
});

Deno.test("kv - string payloads", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const js = nc.jetstream();
const kv = await js.views.kv("K");
await kv.create("a", "b");
let entry = await kv.get("a");
assertExists(entry);
assertEquals(entry?.string(), "b");

await kv.put("a", "c");
entry = await kv.get("a");
assertExists(entry);
assertEquals(entry?.string(), "c");

await kv.update("a", "d", entry!.revision);
entry = await kv.get("a");
assertExists(entry);
assertEquals(entry?.string(), "d");

await cleanup(ns, nc);
});
6 changes: 3 additions & 3 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ export interface KV extends RoKV {
* @param k
* @param data
*/
create(k: string, data: Uint8Array): Promise<number>;
create(k: string, data: Payload): Promise<number>;

/**
* Updates the existing entry provided that the previous sequence
Expand All @@ -1142,7 +1142,7 @@ export interface KV extends RoKV {
* @param data
* @param version
*/
update(k: string, data: Uint8Array, version: number): Promise<number>;
update(k: string, data: Payload, version: number): Promise<number>;

/**
* Sets or updates the value stored under the specified key.
Expand All @@ -1152,7 +1152,7 @@ export interface KV extends RoKV {
*/
put(
k: string,
data: Uint8Array,
data: Payload,
opts?: Partial<KvPutOptions>,
): Promise<number>;

Expand Down

0 comments on commit 98e5a8b

Please sign in to comment.