Skip to content

Commit

Permalink
[FEAT] [KV] delete and purge now take an optional sequence. If the cu…
Browse files Browse the repository at this point in the history
…rrent key is not at the specified revision, the delete/purge fails.
  • Loading branch information
aricart committed Mar 1, 2024
1 parent 8db268a commit e395293
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
26 changes: 19 additions & 7 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
KV,
KvCodec,
KvCodecs,
KvDeleteOptions,
KvEntry,
KvOptions,
kvPrefix,
Expand Down Expand Up @@ -545,12 +546,12 @@ export class Bucket implements KV, KvRemove {
}
}

purge(k: string): Promise<void> {
return this._deleteOrPurge(k, "PURGE");
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "PURGE", opts);
}

delete(k: string): Promise<void> {
return this._deleteOrPurge(k, "DEL");
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "DEL", opts);
}

async purgeDeletes(
Expand Down Expand Up @@ -590,9 +591,13 @@ export class Bucket implements KV, KvRemove {
});
}

async _deleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _deleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
if (!this.hasWildcards(k)) {
return this._doDeleteOrPurge(k, op);
return this._doDeleteOrPurge(k, op, opts);
}
const iter = await this.keys(k);
const buf: Promise<void>[] = [];
Expand All @@ -608,14 +613,21 @@ export class Bucket implements KV, KvRemove {
}
}

async _doDeleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _doDeleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
const ek = this.encodeKey(k);
this.validateKey(ek);
const h = headers();
h.set(kvOperationHdr, op);
if (op === "PURGE") {
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
}
if (opts?.previousSeq) {
h.set(PubHeaders.ExpectedLastSubjectSequenceHdr, `${opts.previousSeq}`);
}
await this.js.publish(this.subjectForKey(ek, true), Empty, { headers: h });
}

Expand Down
48 changes: 48 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1966,3 +1966,51 @@ Deno.test("kv - watch start at", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - delete key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);
await assertRejects(
async () => {
await b.delete("a", { previousSeq: 100 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.delete("a", { previousSeq: seq });

await cleanup(ns, nc);
});

Deno.test("kv - purge key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);

await assertRejects(
async () => {
await b.purge("a", { previousSeq: 2 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.purge("a", { previousSeq: seq });
await cleanup(ns, nc);
});
14 changes: 12 additions & 2 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1238,15 +1238,17 @@ export interface KV extends RoKV {
* a key or the soft delete marker to be removed without
* additional notification on a watch.
* @param k
* @param opts
*/
delete(k: string): Promise<void>;
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Deletes and purges the specified key and any value
* history.
* @param k
* @param opts
*/
purge(k: string): Promise<void>;
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Destroys the underlying stream used by the KV. This
Expand All @@ -1263,6 +1265,14 @@ export interface KvPutOptions {
previousSeq: number;
}

export interface KvDeleteOptions {
/**
* If set the KV must be at the current sequence or the
* put will fail.
*/
previousSeq: number;
}

export type ObjectStoreLink = {
/**
* name of object store storing the data
Expand Down

0 comments on commit e395293

Please sign in to comment.