Skip to content

Commit

Permalink
[FEAT] [CORE] added the ability to publish/respond by providing a Msg…
Browse files Browse the repository at this point in the history
… argument. This is useful downstream to some applications.
  • Loading branch information
aricart committed Apr 16, 2024
1 parent 82e8fc5 commit cadb6bc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
7 changes: 7 additions & 0 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,13 @@ export interface NatsConnection {
*/
publishMessage(msg: Msg): void;

/**
* Replies using the reply subject of the specified message, specifying the
* data, headers in the message if any.
* @param msg
*/
respondMessage(msg: Msg): boolean;

/**
* Subscribe expresses interest in the specified subject. The subject may
* have wildcards. Messages are delivered to the {@link SubOpts#callback |SubscriptionOptions callback}
Expand Down
11 changes: 11 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ export class NatsConnectionImpl implements NatsConnection {
});
}

respondMessage(msg: Msg) {
if (msg.reply) {
this.publish(msg.reply, msg.data, {
reply: msg.reply,
headers: msg.headers,
});
return true;
}
return false;
}

subscribe(
subject: string,
opts: SubscriptionOptions = {},
Expand Down
24 changes: 23 additions & 1 deletion tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ Deno.test("basics - sync subscription", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - respond message", async () => {
Deno.test("basics - publish message", async () => {
const { ns, nc } = await setup();
const sub = nc.subscribe("q");

Expand All @@ -1403,6 +1403,28 @@ Deno.test("basics - respond message", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - respond message", async () => {
const { ns, nc } = await setup();
const sub = nc.subscribe("q");

const nis = new MM(nc);
nis.data = new TextEncoder().encode("not in service");

(async () => {
for await (const m of sub) {
if (m.reply) {
nis.reply = m.reply;
nc.respondMessage(nis);
}
}
})().then();

const r = await nc.request("q");
assertEquals(r.string(), "not in service");

await cleanup(ns, nc);
});

class MM implements Msg {
data!: Uint8Array;
sid: number;
Expand Down

0 comments on commit cadb6bc

Please sign in to comment.