Skip to content

Commit

Permalink
kv enhancements
Browse files Browse the repository at this point in the history
- [feat] added support for key/value codecs, on JavaScript stack the values are always byte arrays, so different codecs are required for keys and values. (#184)
- [feat] added validation to keys and buckets
  • Loading branch information
aricart committed Aug 18, 2021
1 parent a995bb6 commit 4bd4c5d
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 11 deletions.
114 changes: 104 additions & 10 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,48 @@ export interface Entry {
operation: "PUT" | "DEL";
}

export interface KvCodec<T> {
encode(k: T): T;
decode(k: T): T;
}

export interface KvCodecs {
key: KvCodec<string>;
value: KvCodec<Uint8Array>;
}

export function Base64KeyCodec(): KvCodec<string> {
return {
encode(key: string): string {
return btoa(key);
},
decode(bkey: string): string {
return atob(bkey);
},
};
}

export function NoopKvCodecs(): KvCodecs {
return {
key: {
encode(k: string): string {
return k;
},
decode(k: string): string {
return k;
},
},
value: {
encode(v: Uint8Array): Uint8Array {
return v;
},
decode(v: Uint8Array): Uint8Array {
return v;
},
},
};
}

export interface KvStatus {
bucket: string;
values: number;
Expand All @@ -79,6 +121,7 @@ export interface BucketOpts {
mirrorBucket: string;
ttl: number;
streamName: string;
codec: KvCodecs;
}

export function defaultBucketOpts(): Partial<BucketOpts> {
Expand All @@ -88,6 +131,7 @@ export function defaultBucketOpts(): Partial<BucketOpts> {
timeout: 2000,
maxBucketSize: -1,
maxValueSize: -1,
codec: NoopKvCodecs(),
};
}

Expand All @@ -100,6 +144,9 @@ export const kvOperationHdr = "KV-Operation";
const kvPrefix = "KV_";
const kvSubjectPrefix = "$KV";

const validKeyRe = /^[-/=.\w]+$/;
const validBucketRe = /^[-\w]+$/;

export interface RoKV {
get(k: string): Promise<Entry | null>;
history(k: string): Promise<QueuedIterator<Entry>>;
Expand All @@ -116,13 +163,29 @@ export interface KV extends RoKV {
destroy(): Promise<boolean>;
}

// this exported for tests
export function validateKey(k: string) {
if (k.startsWith(".") || k.endsWith(".") || !validKeyRe.test(k)) {
throw new Error(`invalid key: ${k}`);
}
}

// this exported for tests
export function validateBucket(name: string) {
if (!validBucketRe.test(name)) {
throw new Error(`invalid bucket name: ${name}`);
}
}

export class Bucket implements KV {
jsm: JetStreamManager;
js: JetStreamClient;
stream!: string;
bucket: string;
codec!: KvCodecs;

constructor(bucket: string, jsm: JetStreamManager, js: JetStreamClient) {
validateBucket(bucket);
this.jsm = jsm;
this.js = js;
this.bucket = bucket;
Expand All @@ -133,6 +196,7 @@ export class Bucket implements KV {
name: string,
opts: Partial<BucketOpts> = {},
): Promise<KV> {
validateBucket(name);
const to = opts.timeout || 2000;
const jsm = await nc.jetstreamManager({ timeout: to });
const bucket = new Bucket(name, jsm, nc.jetstream({ timeout: to }));
Expand All @@ -142,6 +206,7 @@ export class Bucket implements KV {

async init(opts: Partial<BucketOpts> = {}): Promise<void> {
const bo = Object.assign(defaultBucketOpts(), opts) as BucketOpts;
this.codec = bo.codec;
const sc = {} as StreamConfig;
this.stream = sc.name = opts.streamName ?? this.bucketName();
sc.subjects = [this.subjectForBucket()];
Expand Down Expand Up @@ -174,15 +239,32 @@ export class Bucket implements KV {
return `${kvSubjectPrefix}.${this.bucket}.${k}`;
}

encodeKey(key: string): string {
const chunks: string[] = [];
for (const t of key.split(".")) {
switch (t) {
case ">":
case "*":
chunks.push(t);
break;
default:
chunks.push(this.codec.key.encode(t));
break;
}
}
return chunks.join(".");
}

validateKey = validateKey;

close(): Promise<void> {
return Promise.resolve();
}

smToEntry(sm: StoredMsg): Entry {
const chunks = sm.subject.split(".");
smToEntry(key: string, sm: StoredMsg): Entry {
return {
bucket: this.bucket,
key: chunks[chunks.length - 1],
key: key,
value: sm.data,
delta: 0,
created: sm.time,
Expand All @@ -194,9 +276,10 @@ export class Bucket implements KV {

jmToEntry(k: string, jm: JsMsg): Entry {
const chunks = jm.subject.split(".");
const key = this.codec.key.decode(chunks[chunks.length - 1]);
const e = {
bucket: this.bucket,
key: chunks[chunks.length - 1],
key: key,
value: jm.data,
created: new Date(millis(jm.info.timestampNanos)),
seq: jm.seq,
Expand All @@ -215,6 +298,9 @@ export class Bucket implements KV {
data: Uint8Array,
opts: Partial<PutOptions> = {},
): Promise<number> {
const ek = this.encodeKey(k);
this.validateKey(ek);

const ji = this.js as JetStreamClientImpl;
const cluster = ji.nc.info?.cluster ?? "";
const h = headers();
Expand All @@ -224,16 +310,18 @@ export class Bucket implements KV {
o.expect = {};
o.expect.lastSubjectSequence = opts.previousSeq;
}
const pa = await this.js.publish(this.subjectForKey(k), data, o);
const pa = await this.js.publish(this.subjectForKey(ek), data, o);
return pa.seq;
}

async get(k: string): Promise<Entry | null> {
const ek = this.encodeKey(k);
this.validateKey(ek);
try {
const sm = await this.jsm.streams.getMessage(this.bucketName(), {
last_by_subj: this.subjectForKey(k),
last_by_subj: this.subjectForKey(ek),
});
return this.smToEntry(sm);
return this.smToEntry(k, sm);
} catch (err) {
if (err.message === "no message found") {
return null;
Expand All @@ -243,15 +331,21 @@ export class Bucket implements KV {
}

async delete(k: string): Promise<void> {
const ek = this.encodeKey(k);
this.validateKey(ek);
const ji = this.js as JetStreamClientImpl;
const cluster = ji.nc.info?.cluster ?? "";
const h = headers();
h.set(kvOriginClusterHdr, cluster);
h.set(kvOperationHdr, "DEL");
await this.js.publish(this.subjectForKey(k), Empty, { headers: h });
await this.js.publish(this.subjectForKey(ek), Empty, { headers: h });
}

consumerOn(k: string, lastOnly = false): Promise<ConsumerInfo> {
const ek = this.encodeKey(k);
if (k !== "*") {
this.validateKey(ek);
}
const ji = this.js as JetStreamClientImpl;
const nc = ji.nc;
const inbox = createInbox(nc.options.inboxPrefix);
Expand All @@ -261,7 +355,7 @@ export class Bucket implements KV {
? DeliverPolicy.LastPerSubject
: DeliverPolicy.All,
"ack_policy": AckPolicy.Explicit,
"filter_subject": this.subjectForKey(k),
"filter_subject": this.subjectForKey(ek),
"flow_control": k === "*",
};
return this.jsm.consumers.add(this.stream, opts);
Expand Down Expand Up @@ -367,7 +461,7 @@ export class Bucket implements KV {
m.respond();
} else {
const chunks = m.subject.split(".");
s.push(chunks[chunks.length - 1]);
s.push(this.codec.key.decode(chunks[chunks.length - 1]));
m.respond();
const info = parseInfo(m.reply!);
if (info.pending === 0) {
Expand Down

0 comments on commit 4bd4c5d

Please sign in to comment.