Skip to content

Commit

Permalink
typed stream update - this creates a breaking change - for api symm…
Browse files Browse the repository at this point in the history
…etry with update consumer, the name of the stream is now a required argument.
  • Loading branch information
aricart committed Nov 29, 2021
1 parent a11616d commit 25cf542
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
2 changes: 1 addition & 1 deletion examples/jetstream/jsm_readme_jsm_example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const si = await jsm.streams.info(name);

// update a stream configuration
si.config.subjects?.push("a.b");
await jsm.streams.update(si.config);
await jsm.streams.update(name, si.config);

// get a particular stored message in the stream by sequence
// this is not associated with a consumer
Expand Down
10 changes: 7 additions & 3 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
StreamInfoRequestOptions,
StreamListResponse,
StreamMsgResponse,
StreamUpdateConfig,
SuccessResponse,
} from "./types.ts";
import { BaseApiClient } from "./jsbaseclient_api.ts";
Expand Down Expand Up @@ -61,10 +62,13 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
return cr.success;
}

async update(cfg = {} as StreamConfig): Promise<StreamInfo> {
validateStreamName(cfg.name);
async update(
name: string,
cfg = {} as StreamUpdateConfig,
): Promise<StreamInfo> {
validateStreamName(name);
const r = await this._request(
`${this.prefix}.STREAM.UPDATE.${cfg.name}`,
`${this.prefix}.STREAM.UPDATE.${name}`,
cfg,
);
const si = r as StreamInfo;
Expand Down
27 changes: 15 additions & 12 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ export type StreamInfoRequestOptions = {
export interface StreamAPI {
info(stream: string, opts?: StreamInfoRequestOptions): Promise<StreamInfo>;
add(cfg: Partial<StreamConfig>): Promise<StreamInfo>;
update(cfg: StreamConfig): Promise<StreamInfo>;
update(name: string, cfg: StreamUpdateConfig): Promise<StreamInfo>;
purge(stream: string, opts?: PurgeOpts): Promise<PurgeResponse>;
delete(stream: string): Promise<boolean>;
list(): Lister<StreamInfo>;
Expand Down Expand Up @@ -536,29 +536,32 @@ export interface StreamInfo {
sources?: StreamSourceInfo[];
}

export interface StreamConfig {
export interface StreamConfig extends StreamUpdateConfig {
name: string;
description?: string;
subjects?: string[];
retention: RetentionPolicy;
storage: StorageType;
"num_replicas": number;
"template_owner"?: string;
placement?: Placement;
mirror?: StreamSource; // same as a source
sealed: boolean;
"deny_delete": boolean;
"deny_purge": boolean;
}

export interface StreamUpdateConfig {
description?: string;
"max_consumers": number;
"max_msgs_per_subject"?: number;
"max_msgs": number;
"max_bytes": number;
"max_age": Nanos;
"max_bytes": number;
"max_msg_size"?: number;
storage: StorageType;
discard?: DiscardPolicy;
"num_replicas": number;
"no_ack"?: boolean;
"template_owner"?: string;
"duplicate_window"?: Nanos;
placement?: Placement;
mirror?: StreamSource; // same as a source
sources?: StreamSource[];
sealed: boolean;
"deny_delete": boolean;
"deny_purge": boolean;
"allow_rollup_hdrs": boolean;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2346,7 +2346,7 @@ Deno.test("jetstream - seal", async () => {
await jsm.streams.deleteMessage(stream, 1);

si.config.sealed = true;
const usi = await jsm.streams.update(si.config);
const usi = await jsm.streams.update(stream, si.config);
assertEquals(usi.config.sealed, true);

await assertThrowsAsync(
Expand Down
10 changes: 5 additions & 5 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ Deno.test("jsm - empty stream config update fails", async () => {

await assertThrowsAsync(
async () => {
await jsm.streams.update({} as StreamConfig);
await jsm.streams.update("", {} as StreamConfig);
},
Error,
StreamNameRequired,
);
ci!.config!.subjects!.push("foo");
ci = await jsm.streams.update(ci.config);
ci = await jsm.streams.update(name, ci.config);
assertEquals(ci!.config!.subjects!.length, 2);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -631,7 +631,7 @@ Deno.test("jsm - update stream", async () => {
assertEquals(si.config!.subjects!.length, 1);

si.config!.subjects!.push("foo");
si = await jsm.streams.update(si.config);
si = await jsm.streams.update(stream, si.config);
assertEquals(si.config!.subjects!.length, 2);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -804,9 +804,9 @@ Deno.test("jsm - cross account streams", async () => {
assertEquals(si.state.messages, 0);

// update
const config = streams[0].config;
const config = streams[0].config as StreamConfig;
config.subjects!.push(`${stream}.B`);
si = await jsm.streams.update(config);
si = await jsm.streams.update(config.name, config);
assertEquals(si.config.subjects!.length, 2);

// find
Expand Down

0 comments on commit 25cf542

Please sign in to comment.