Skip to content

Commit

Permalink
[FEAT] added jsm.streams.listKvs() to provide a list of all the KvS…
Browse files Browse the repository at this point in the history
…tatus for all streams that use the kv naming strategy.

[FEAT] added `jsm.streams.listObjectStores()` to provide a list of all the ObjectStoreStatus for all streams that use the objectstore naming strategy.

[FEAT] [KV] `KvStatus` now has the property `streamInfo` with the `StreamInfo` for the KV.

[CHANGE] [BREAKING] [KV] `KvStatus.bucket` is now the user specified name of the KV used when created - not the stream backing it. This is consistent with the Go client. Previously it returned `KV_` + name. For the actual stream name see `KvStatus.streamInfo.config.name`

[CHANGE] `ObjectStoreInfo` is deprecated use `ObjectStoreStatus` instead.
  • Loading branch information
aricart committed Sep 15, 2022
1 parent 6164d01 commit 90d6df6
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 37 deletions.
48 changes: 48 additions & 0 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import {
Empty,
JetStreamOptions,
KvStatus,
Lister,
MsgDeleteRequest,
MsgRequest,
NatsConnection,
ObjectStoreStatus,
PurgeBySeq,
PurgeOpts,
PurgeResponse,
Expand All @@ -38,6 +40,8 @@ import { BaseApiClient } from "./jsbaseclient_api.ts";
import { ListerFieldFilter, ListerImpl } from "./jslister.ts";
import { validateStreamName } from "./jsutil.ts";
import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { kvPrefix, KvStatusImpl } from "./kv.ts";
import { ObjectStoreStatusImpl, osPrefix } from "./objectstore.ts";

export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
constructor(nc: NatsConnection, opts?: JetStreamOptions) {
Expand Down Expand Up @@ -165,6 +169,50 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
find(subject: string): Promise<string> {
return this.findStream(subject);
}

listKvs(): Lister<KvStatus> {
const filter: ListerFieldFilter<KvStatus> = (
v: unknown,
): KvStatus[] => {
const slr = v as StreamListResponse;
const kvStreams = slr.streams.filter((v) => {
return v.config.name.startsWith(kvPrefix);
});
kvStreams.forEach((si) => {
this._fixInfo(si);
});
let cluster = "";
if (kvStreams.length) {
cluster = this.nc.info?.cluster ?? "";
}
const status = kvStreams.map((si) => {
return new KvStatusImpl(si, cluster);
});
return status;
};
const subj = `${this.prefix}.STREAM.LIST`;
return new ListerImpl<KvStatus>(subj, filter, this);
}

listObjectStores(): Lister<ObjectStoreStatus> {
const filter: ListerFieldFilter<ObjectStoreStatus> = (
v: unknown,
): ObjectStoreStatus[] => {
const slr = v as StreamListResponse;
const objStreams = slr.streams.filter((v) => {
return v.config.name.startsWith(osPrefix);
});
objStreams.forEach((si) => {
this._fixInfo(si);
});
const status = objStreams.map((si) => {
return new ObjectStoreStatusImpl(si);
});
return status;
};
const subj = `${this.prefix}.STREAM.LIST`;
return new ListerImpl<ObjectStoreStatus>(subj, filter, this);
}
}

export class StoredMsgImpl implements StoredMsg {
Expand Down
94 changes: 82 additions & 12 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import {
Placement,
PurgeOpts,
PurgeResponse,
Republish,
RetentionPolicy,
StorageType,
StoredMsg,
StreamConfig,
StreamInfo,
} from "./types.ts";
import {
JetStreamClientImpl,
Expand Down Expand Up @@ -102,7 +104,7 @@ export function defaultBucketOpts(): Partial<KvOptions> {
type OperationType = "PUT" | "DEL" | "PURGE";

export const kvOperationHdr = "KV-Operation";
const kvPrefix = "KV_";
export const kvPrefix = "KV_";
const kvSubjectPrefix = "$KV";

const validKeyRe = /^[-/=.\w]+$/;
Expand Down Expand Up @@ -751,16 +753,84 @@ export class Bucket implements KV, KvRemove {
async status(): Promise<KvStatus> {
const ji = this.js as JetStreamClientImpl;
const cluster = ji.nc.info?.cluster ?? "";
const si = await this.jsm.streams.info(this.bucketName());
return {
bucket: this.bucketName(),
values: si.state.messages,
history: si.config.max_msgs_per_subject,
ttl: millis(si.config.max_age),
bucket_location: cluster,
backingStore: si.config.storage,
storage: si.config.storage,
replicas: si.config.num_replicas,
} as KvStatus;
const bn = this.bucketName();
const si = await this.jsm.streams.info(bn);
return new KvStatusImpl(si, cluster);
}
}

export class KvStatusImpl implements KvStatus {
si: StreamInfo;
cluster: string;

constructor(si: StreamInfo, cluster = "") {
this.si = si;
this.cluster = cluster;
}

get bucket(): string {
return this.si.config.name.startsWith(kvPrefix)
? this.si.config.name.substring(kvPrefix.length)
: this.si.config.name;
}

get values(): number {
return this.si.state.messages;
}

get history(): number {
return this.si.config.max_msgs_per_subject;
}

get ttl(): number {
return millis(this.si.config.max_age);
}

get bucket_location(): string {
return this.cluster;
}

get backingStore(): StorageType {
return this.si.config.storage;
}

get storage(): StorageType {
return this.si.config.storage;
}

get replicas(): number {
return this.si.config.num_replicas;
}

get description(): string {
return this.si.config.description ?? "";
}

get maxBucketSize(): number {
return this.si.config.max_bytes;
}

get maxValueSize(): number {
return this.si.config.max_msg_size;
}

get max_bytes(): number {
return this.si.config.max_bytes;
}

get placement(): Placement {
return this.si.config.placement || { cluster: "", tags: [] };
}

get placementCluster(): string {
return this.si.config.placement?.cluster ?? "";
}

get republish(): Republish {
return this.si.config.republish ?? { src: "", dest: "" };
}

get streamInfo(): StreamInfo {
return this.si;
}
}
18 changes: 10 additions & 8 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import {
ObjectInfo,
ObjectResult,
ObjectStore,
ObjectStoreInfo,
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStoreStatus,
PubAck,
PurgeResponse,
StorageType,
Expand All @@ -46,19 +46,21 @@ import { NatsError } from "./mod.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { SHA256 } from "./sha256.js";

export const osPrefix = "OBJ_";

export function objectStoreStreamName(bucket: string): string {
validateBucket(bucket);
return `OBJ_${bucket}`;
return `${osPrefix}${bucket}`;
}

export function objectStoreBucketName(stream: string): string {
if (stream.startsWith("OBJ_")) {
if (stream.startsWith(osPrefix)) {
return stream.substring(4);
}
return stream;
}

export class ObjectStoreInfoImpl implements ObjectStoreInfo {
export class ObjectStoreStatusImpl implements ObjectStoreStatus {
si: StreamInfo;
backingStore: string;

Expand Down Expand Up @@ -274,24 +276,24 @@ export class ObjectStoreImpl implements ObjectStore {
}
}

async seal(): Promise<ObjectStoreInfo> {
async seal(): Promise<ObjectStoreStatus> {
let info = await this._si();
if (info === null) {
return Promise.reject(new Error("object store not found"));
}
info.config.sealed = true;
info = await this.jsm.streams.update(this.stream, info.config);
return Promise.resolve(new ObjectStoreInfoImpl(info));
return Promise.resolve(new ObjectStoreStatusImpl(info));
}

async status(
opts?: Partial<StreamInfoRequestOptions>,
): Promise<ObjectStoreInfo> {
): Promise<ObjectStoreStatus> {
const info = await this._si(opts);
if (info === null) {
return Promise.reject(new Error("object store not found"));
}
return Promise.resolve(new ObjectStoreInfoImpl(info));
return Promise.resolve(new ObjectStoreStatusImpl(info));
}

destroy(): Promise<boolean> {
Expand Down
32 changes: 29 additions & 3 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,18 @@ export interface StreamAPI {
* @param subject
*/
find(subject: string): Promise<string>;

/**
* Returns a list of KvStatus for all streams that are identified as
* being a KV (that is having names that have the prefix `KV_`)
*/
listKvs(): Lister<KvStatus>;

/**
* Returns a list of ObjectStoreInfo for all streams that are identified as
* being a ObjectStore (that is having names that have the prefix `OBJ_`)
*/
listObjectStores(): Lister<ObjectStoreStatus>;
}

/**
Expand Down Expand Up @@ -2573,6 +2585,11 @@ export interface KvStatus extends KvLimits {
* FIXME: remove this on 1.8
*/
bucket_location: string;

/**
* The StreamInfo backing up the KV
*/
streamInfo: StreamInfo;
}

export interface KvOptions extends KvLimits {
Expand Down Expand Up @@ -2766,7 +2783,7 @@ export interface ObjectLink {
name?: string;
}

export type ObjectStoreInfo = {
export type ObjectStoreStatus = {
bucket: string;
description: string;
ttl: Nanos;
Expand All @@ -2775,8 +2792,17 @@ export type ObjectStoreInfo = {
sealed: boolean;
size: number;
backingStore: string;
/**
* The StreamInfo backing up the ObjectStore
*/
streamInfo: StreamInfo;
};

/**
* @deprecated {@see ObjectStoreStatus}
*/
export type ObjectStoreInfo = ObjectStoreStatus;

export type ObjectStoreOptions = {
description?: string;
ttl?: Nanos;
Expand Down Expand Up @@ -2811,8 +2837,8 @@ export interface ObjectStore {
}
>,
): Promise<QueuedIterator<ObjectInfo | null>>;
seal(): Promise<ObjectStoreInfo>;
status(opts?: Partial<StreamInfoRequestOptions>): Promise<ObjectStoreInfo>;
seal(): Promise<ObjectStoreStatus>;
status(opts?: Partial<StreamInfoRequestOptions>): Promise<ObjectStoreStatus>;
update(name: string, meta: Partial<ObjectStoreMeta>): Promise<PubAck>;
destroy(): Promise<boolean>;
}
Expand Down
38 changes: 38 additions & 0 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1497,3 +1497,41 @@ Deno.test("jsm - consumer name is validated", async () => {

await cleanup(ns, nc);
});

Deno.test("jsm - list kvs", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });
let kvs = await jsm.streams.listKvs().next();
assertEquals(kvs.length, 0);

const js = nc.jetstream();
await js.views.kv("A");
kvs = await jsm.streams.listKvs().next();
assertEquals(kvs.length, 1);
assertEquals(kvs[0].bucket, `A`);

await cleanup(ns, nc);
});

Deno.test("jsm - list objectstores", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });
let objs = await jsm.streams.listObjectStores().next();
assertEquals(objs.length, 0);

const js = nc.jetstream();
await js.views.os("A");
objs = await jsm.streams.listObjectStores().next();
assertEquals(objs.length, 1);
assertEquals(objs[0].bucket, "A");

await cleanup(ns, nc);
});
Loading

0 comments on commit 90d6df6

Please sign in to comment.