Skip to content

Commit

Permalink
[FEAT] [JS] stream object can now delete a message - also stream info…
Browse files Browse the repository at this point in the history
… is able to specify info options
  • Loading branch information
aricart committed May 30, 2023
1 parent e774c6d commit ac81ec0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
11 changes: 9 additions & 2 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,14 @@ export class StreamImpl implements Stream {
}
}

info(cached = false): Promise<StreamInfo> {
info(
cached = false,
opts?: Partial<StreamInfoRequestOptions>,
): Promise<StreamInfo> {
if (cached) {
return Promise.resolve(this._info);
}
return this.api.info(this.name)
return this.api.info(this.name, opts)
.then((si) => {
this._info = si;
return this._info;
Expand All @@ -206,6 +209,10 @@ export class StreamImpl implements Stream {
getMessage(query: MsgRequest): Promise<StoredMsg> {
return this.api.getMessage(this.name, query);
}

deleteMessage(seq: number, erase?: boolean): Promise<boolean> {
return this.api.deleteMessage(this.name, seq, erase);
}
}

export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
Expand Down
32 changes: 32 additions & 0 deletions jetstream/tests/streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,35 @@ Deno.test("streams - consumers", async () => {

await cleanup(ns, nc);
});

Deno.test("streams - delete message", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const js = nc.jetstream();

// add a stream and a message
const { stream, subj } = await initStream(nc);
await Promise.all([js.publish(subj), js.publish(subj), js.publish(subj)]);

// retrieve the stream
const s = await js.streams.get(stream);
assertExists(s);
assertEquals(s.name, stream);

// get a message
const sm = await s.getMessage({ seq: 2 });
assertExists(sm);

assertEquals(await s.deleteMessage(2, true), true);
await assertRejects(
async () => {
await s.getMessage({ seq: 2 });
},
Error,
"no message found",
);

const si = await s.info(false, { deleted_details: true });
assertEquals(si.state.deleted, [2]);

await cleanup(ns, nc);
});
7 changes: 6 additions & 1 deletion jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,10 @@ export enum AdvisoryKind {
export interface Stream {
name: string;

info(cached?: boolean): Promise<StreamInfo>;
info(
cached?: boolean,
opts?: Partial<StreamInfoRequestOptions>,
): Promise<StreamInfo>;

alternates(): Promise<StreamAlternate[]>;

Expand All @@ -863,6 +866,8 @@ export interface Stream {
): Promise<Consumer>;

getMessage(query: MsgRequest): Promise<StoredMsg>;

deleteMessage(seq: number, erase?: boolean): Promise<boolean>;
}

export enum JsHeaders {
Expand Down

0 comments on commit ac81ec0

Please sign in to comment.