Skip to content

Commit

Permalink
[FIX] removed conditional check on setting KvEntry delta, as the pend…
Browse files Browse the repository at this point in the history
…ing are properly reported
  • Loading branch information
aricart committed Nov 10, 2021
1 parent 8262f50 commit 4818036
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 6 deletions.
8 changes: 2 additions & 6 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,15 @@ export class Bucket implements KV, KvRemove {

jmToEntry(k: string, jm: JsMsg): KvEntry {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
const e = {
return {
bucket: this.bucket,
key: key,
value: jm.data,
created: new Date(millis(jm.info.timestampNanos)),
revision: jm.seq,
operation: jm.headers?.get(kvOperationHdr) as OperationType || "PUT",
delta: jm.info.pending,
} as KvEntry;

if (k !== ">") {
e.delta = jm.info.pending;
}
return e;
}

create(k: string, data: Uint8Array): Promise<number> {
Expand Down
74 changes: 74 additions & 0 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,77 @@ Deno.test("kv - internal consumer", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - is wildcard delete implemented", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

const name = nuid.next();
const b = await Bucket.create(nc, name, { history: 10 }) as Bucket;
await b.put("a", Empty);
await b.put("a.a", Empty);
await b.put("a.b", Empty);
await b.put("a.b.c", Empty);

let keys = await collect(await b.keys());
assertEquals(keys.length, 4);

await b.delete("a.*");
keys = await collect(await b.keys());
assertEquals(keys.length, 2);

// this was a manual delete, so we should have tombstones
// for all the deleted entries
let deleted = 0;
const w = await b.watch();
await (async () => {
for await (const e of w) {
if (e.operation === "DEL") {
deleted++;
}
if (e.delta === 0) {
break;
}
}
})();
assertEquals(deleted, 2);

await nc.close();
await ns.stop();
});

Deno.test("kv - delta", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

const name = nuid.next();
const b = await Bucket.create(nc, name) as Bucket;
await b.put("a", Empty);
await b.put("a.a", Empty);
await b.put("a.b", Empty);
await b.put("a.b.c", Empty);

const w = await b.history();
await (async () => {
let i = 0;
let delta = 4;
for await (const e of w) {
assertEquals(e.revision, ++i);
assertEquals(e.delta, --delta);
if (e.delta === 0) {
break;
}
}
})();

await nc.close();
await ns.stop();
});

0 comments on commit 4818036

Please sign in to comment.