diff --git a/jetstream/jsmstream_api.ts b/jetstream/jsmstream_api.ts index e5006e9e..35c233c1 100644 --- a/jetstream/jsmstream_api.ts +++ b/jetstream/jsmstream_api.ts @@ -185,11 +185,14 @@ export class StreamImpl implements Stream { } } - info(cached = false): Promise { + info( + cached = false, + opts?: Partial, + ): Promise { if (cached) { return Promise.resolve(this._info); } - return this.api.info(this.name) + return this.api.info(this.name, opts) .then((si) => { this._info = si; return this._info; @@ -206,6 +209,10 @@ export class StreamImpl implements Stream { getMessage(query: MsgRequest): Promise { return this.api.getMessage(this.name, query); } + + deleteMessage(seq: number, erase?: boolean): Promise { + return this.api.deleteMessage(this.name, seq, erase); + } } export class StreamAPIImpl extends BaseApiClient implements StreamAPI { diff --git a/jetstream/tests/streams_test.ts b/jetstream/tests/streams_test.ts index 16b8be31..680ad023 100644 --- a/jetstream/tests/streams_test.ts +++ b/jetstream/tests/streams_test.ts @@ -154,3 +154,35 @@ Deno.test("streams - consumers", async () => { await cleanup(ns, nc); }); + +Deno.test("streams - delete message", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const js = nc.jetstream(); + + // add a stream and a message + const { stream, subj } = await initStream(nc); + await Promise.all([js.publish(subj), js.publish(subj), js.publish(subj)]); + + // retrieve the stream + const s = await js.streams.get(stream); + assertExists(s); + assertEquals(s.name, stream); + + // get a message + const sm = await s.getMessage({ seq: 2 }); + assertExists(sm); + + assertEquals(await s.deleteMessage(2, true), true); + await assertRejects( + async () => { + await s.getMessage({ seq: 2 }); + }, + Error, + "no message found", + ); + + const si = await s.info(false, { deleted_details: true }); + assertEquals(si.state.deleted, [2]); + + await cleanup(ns, nc); +}); diff --git a/jetstream/types.ts b/jetstream/types.ts index 50eeb505..258929c6 100644 --- a/jetstream/types.ts +++ b/jetstream/types.ts @@ -852,7 +852,10 @@ export enum AdvisoryKind { export interface Stream { name: string; - info(cached?: boolean): Promise; + info( + cached?: boolean, + opts?: Partial, + ): Promise; alternates(): Promise; @@ -863,6 +866,8 @@ export interface Stream { ): Promise; getMessage(query: MsgRequest): Promise; + + deleteMessage(seq: number, erase?: boolean): Promise; } export enum JsHeaders {