Skip to content

Commit

Permalink
[FEAT] added (internal for now) ability to purge deletes (#304)
Browse files Browse the repository at this point in the history
FIX #303
  • Loading branch information
aricart authored May 17, 2022
1 parent f6ea320 commit 3cbd24d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
39 changes: 38 additions & 1 deletion nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
import { millis, nanos } from "./jsutil.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { headers, MsgHdrs } from "./headers.ts";
import { consumerOpts } from "./mod.ts";
import { consumerOpts, deferred } from "./mod.ts";
import { compare, parseSemVer } from "./semver.ts";

export function Base64KeyCodec(): KvCodec<string> {
Expand Down Expand Up @@ -403,6 +403,43 @@ export class Bucket implements KV, KvRemove {
return this._deleteOrPurge(k, "DEL");
}

async purgeDeletes(
olderMillis: number = 30 * 60 * 1000,
): Promise<PurgeResponse> {
const done = deferred();
const buf: KvEntry[] = [];
const i = await this.watch({
key: ">",
initializedFn: () => {
done.resolve();
},
});
(async () => {
for await (const e of i) {
if (e.operation === "DEL" || e.operation === "PURGE") {
buf.push(e);
}
}
})().then();
await done;
i.stop();
const min = Date.now() - olderMillis;
const proms = buf.map((e) => {
const subj = this.subjectForKey(e.key);
if (e.created.getTime() >= min) {
return this.jsm.streams.purge(this.stream, { filter: subj, keep: 1 });
} else {
return this.jsm.streams.purge(this.stream, { filter: subj, keep: 0 });
}
});
const purged = await Promise.all(proms);
purged.unshift({ success: true, purged: 0 });
return purged.reduce((pv: PurgeResponse, cv: PurgeResponse) => {
pv.purged += cv.purged;
return pv;
});
}

async _deleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
if (!this.hasWildcards(k)) {
return this._doDeleteOrPurge(k, op);
Expand Down
28 changes: 28 additions & 0 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1307,3 +1307,31 @@ Deno.test("kv - get revision", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - purge deletes", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
const js = nc.jetstream();

const b = await js.views.kv("a") as Bucket;

// keep the marker if delete is younger
await b.put("a", Empty);
await b.put("b", Empty);
await b.put("c", Empty);
await b.delete("a");
await b.delete("c");
await delay(1000);
await b.delete("b");

const pr = await b.purgeDeletes(700);
assertEquals(pr.purged, 2);
assertEquals(await b.get("a"), null);
assertEquals(await b.get("c"), null);

const e = await b.get("b");
assertEquals(e?.operation, "DEL");

await cleanup(ns, nc);
});

0 comments on commit 3cbd24d

Please sign in to comment.