Skip to content

Commit

Permalink
Merge pull request #656 from nats-io/delete-purge-at-seq
Browse files Browse the repository at this point in the history
[FEAT] [KV] delete and purge at seq
  • Loading branch information
aricart committed Mar 11, 2024
2 parents 3f2a117 + e395293 commit 779f01c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
26 changes: 19 additions & 7 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
KV,
KvCodec,
KvCodecs,
KvDeleteOptions,
KvEntry,
KvOptions,
kvPrefix,
Expand Down Expand Up @@ -545,12 +546,12 @@ export class Bucket implements KV, KvRemove {
}
}

purge(k: string): Promise<void> {
return this._deleteOrPurge(k, "PURGE");
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "PURGE", opts);
}

delete(k: string): Promise<void> {
return this._deleteOrPurge(k, "DEL");
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "DEL", opts);
}

async purgeDeletes(
Expand Down Expand Up @@ -590,9 +591,13 @@ export class Bucket implements KV, KvRemove {
});
}

async _deleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _deleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
if (!this.hasWildcards(k)) {
return this._doDeleteOrPurge(k, op);
return this._doDeleteOrPurge(k, op, opts);
}
const iter = await this.keys(k);
const buf: Promise<void>[] = [];
Expand All @@ -608,14 +613,21 @@ export class Bucket implements KV, KvRemove {
}
}

async _doDeleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _doDeleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
const ek = this.encodeKey(k);
this.validateKey(ek);
const h = headers();
h.set(kvOperationHdr, op);
if (op === "PURGE") {
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
}
if (opts?.previousSeq) {
h.set(PubHeaders.ExpectedLastSubjectSequenceHdr, `${opts.previousSeq}`);
}
await this.js.publish(this.subjectForKey(ek, true), Empty, { headers: h });
}

Expand Down
48 changes: 48 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1966,3 +1966,51 @@ Deno.test("kv - watch start at", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - delete key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);
await assertRejects(
async () => {
await b.delete("a", { previousSeq: 100 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.delete("a", { previousSeq: seq });

await cleanup(ns, nc);
});

Deno.test("kv - purge key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);

await assertRejects(
async () => {
await b.purge("a", { previousSeq: 2 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.purge("a", { previousSeq: seq });
await cleanup(ns, nc);
});
14 changes: 12 additions & 2 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1238,15 +1238,17 @@ export interface KV extends RoKV {
* a key or the soft delete marker to be removed without
* additional notification on a watch.
* @param k
* @param opts
*/
delete(k: string): Promise<void>;
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Deletes and purges the specified key and any value
* history.
* @param k
* @param opts
*/
purge(k: string): Promise<void>;
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Destroys the underlying stream used by the KV. This
Expand All @@ -1263,6 +1265,14 @@ export interface KvPutOptions {
previousSeq: number;
}

export interface KvDeleteOptions {
/**
* If set the KV must be at the current sequence or the
* put will fail.
*/
previousSeq: number;
}

export type ObjectStoreLink = {
/**
* name of object store storing the data
Expand Down

0 comments on commit 779f01c

Please sign in to comment.