Skip to content

Commit

Permalink
[FEAT] [JS] consumer/stream metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Feb 14, 2023
1 parent 6213c59 commit 2293a2f
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 0 deletions.
6 changes: 6 additions & 0 deletions nats-base-client/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
}
newAPI = false;
}
if (cfg.metadata) {
const { min, ok } = nci.features.get(Feature.JS_STREAM_CONSUMER_METADATA);
if (!ok) {
throw new Error(`consumer 'metadata' requires server ${min}`);
}
}
if (newAPI) {
consumerName = cfg.name ?? cfg.durable_name ?? "";
}
Expand Down
16 changes: 16 additions & 0 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import { kvPrefix, KvStatusImpl } from "./kv.ts";
import { ObjectStoreStatusImpl, osPrefix } from "./objectstore.ts";
import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";
import { Feature } from "./semver.ts";
import { NatsConnectionImpl } from "./nats.ts";

export function convertStreamSourceDomain(s?: StreamSource) {
if (s === undefined) {
Expand Down Expand Up @@ -76,6 +78,13 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
}

async add(cfg = {} as Partial<StreamConfig>): Promise<StreamInfo> {
const nci = this.nc as NatsConnectionImpl;
if (cfg.metadata) {
const { min, ok } = nci.features.get(Feature.JS_STREAM_CONSUMER_METADATA);
if (!ok) {
throw new Error(`stream 'metadata' requires server ${min}`);
}
}
validateStreamName(cfg.name);
cfg.mirror = convertStreamSourceDomain(cfg.mirror);
//@ts-ignore: the sources are either set or not - so no item should be undefined in the list
Expand Down Expand Up @@ -108,6 +117,13 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
`\u001B[33m >> streams.update(config: StreamConfig) api changed to streams.update(name: string, config: StreamUpdateConfig) - this shim will be removed - update your code. \u001B[0m`,
);
}
const nci = this.nc as NatsConnectionImpl;
if (cfg.metadata) {
const { min, ok } = nci.features.get(Feature.JS_STREAM_CONSUMER_METADATA);
if (!ok) {
throw new Error(`stream 'metadata' requires server ${min}`);
}
}
validateStreamName(name);
const old = await this.info(name);
const update = Object.assign(old.config, cfg);
Expand Down
2 changes: 2 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export enum Feature {
JS_NEW_CONSUMER_CREATE_API = "js_new_consumer_create",
JS_ALLOW_DIRECT = "js_allow_direct",
JS_MULTIPLE_CONSUMER_FILTER = "js_multiple_consumer_filter",
JS_STREAM_CONSUMER_METADATA = "js_stream_consumer_metadata",
}

type FeatureVersion = {
Expand Down Expand Up @@ -93,6 +94,7 @@ export class Features {
this.set(Feature.JS_NEW_CONSUMER_CREATE_API, "2.9.0");
this.set(Feature.JS_ALLOW_DIRECT, "2.9.0");
this.set(Feature.JS_MULTIPLE_CONSUMER_FILTER, "2.10.0");
this.set(Feature.JS_STREAM_CONSUMER_METADATA, "2.10.0");

this.disabled.forEach((f) => {
this.features.delete(f);
Expand Down
12 changes: 12 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,12 @@ export interface StreamUpdateConfig {
* onto new subjects for partitioning and more
*/
republish?: Republish;
/**
* Metadata field to store additional information about the stream. Note that
* keys starting with `_nats` are reserved. This feature only supported on servers
* 2.10.x and better.
*/
metadata?: Record<string, string>;
}

export interface Republish {
Expand Down Expand Up @@ -2562,6 +2568,12 @@ export interface ConsumerUpdateConfig {
* This is exclusive of {@link filter_subject}.
*/
"filter_subjects"?: string[];
/**
* Metadata field to store additional information about the consumer. Note that
* keys starting with `_nats` are reserved. This feature only supported on servers
* 2.10.x and better.
*/
metadata?: Record<string, string>;
}

export interface Consumer {
Expand Down
92 changes: 92 additions & 0 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2057,3 +2057,95 @@ Deno.test("jsm - stored msg decode", async () => {

await cleanup(ns, nc);
});

Deno.test("jsm - stream/consumer metadata", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;

async function addStream(name: string, md?: Record<string, string>) {
const si = await jsm.streams.add({
name,
subjects: [name],
metadata: md,
});
assertEquals(si.config.metadata, md);
}
async function updateStream(name: string, md?: Record<string, string>) {
const si = await jsm.streams.update(name, {
metadata: md,
});
assertEquals(si.config.metadata, md);
}
async function addConsumer(
stream: string,
name: string,
md?: Record<string, string>,
) {
const ci = await jsm.consumers.add(stream, {
durable_name: name,
metadata: md,
});
assertEquals(ci.config.metadata, md);
}

async function updateConsumer(
stream: string,
name: string,
md?: Record<string, string>,
) {
const ci = await jsm.consumers.update(stream, name, { metadata: md });
assertEquals(ci.config.metadata, md);
}
// we should be able to add/update metadata
let stream = nuid.next();
let consumer = nuid.next();
await addStream(stream, { hello: "world" });
await updateStream(stream, { one: "two" });
await addConsumer(stream, consumer, { test: "true" });
await updateConsumer(stream, consumer, { foo: "bar" });

// fake a server version change
(nc as NatsConnectionImpl).features.update("2.9.0");
stream = nuid.next();
consumer = nuid.next();
await assertRejects(
async () => {
await addStream(stream, { hello: "world" });
},
Error,
"stream 'metadata' requires server 2.10.0",
);
// add without md
await addStream(stream);
// should fail update w/ metadata
await assertRejects(
async () => {
await updateStream(stream, { hello: "world" });
},
Error,
"stream 'metadata' requires server 2.10.0",
);
// should fail adding consumer with md
await assertRejects(
async () => {
await addConsumer(stream, consumer, { hello: "world" });
},
Error,
"consumer 'metadata' requires server 2.10.0",
);
// add w/o metadata
await addConsumer(stream, consumer);
// should fail to update consumer with md
await assertRejects(
async () => {
await updateConsumer(stream, consumer, { hello: "world" });
},
Error,
"consumer 'metadata' requires server 2.10.0",
);

await cleanup(ns, nc);
});

0 comments on commit 2293a2f

Please sign in to comment.