Skip to content

Commit

Permalink
Merge pull request #686 from nats-io/pub-msg
Browse files Browse the repository at this point in the history
[FEAT] [CORE] added the ability to publish by providing a Msg argument
  • Loading branch information
aricart committed Apr 19, 2024
2 parents e3ccac2 + 9781b7b commit a4d0f29
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 2 deletions.
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
.PHONY: build test bundle lint

export DENO_JOBS=4

build: test

lint:
Expand Down
14 changes: 14 additions & 0 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ export interface NatsConnection {
options?: PublishOptions,
): void;

/**
* Publishes using the subject of the specified message, specifying the
* data, headers and reply found in the message if any.
* @param msg
*/
publishMessage(msg: Msg): void;

/**
* Replies using the reply subject of the specified message, specifying the
* data, headers in the message if any.
* @param msg
*/
respondMessage(msg: Msg): boolean;

/**
* Subscribe expresses interest in the specified subject. The subject may
* have wildcards. Messages are delivered to the {@link SubOpts#callback |SubscriptionOptions callback}
Expand Down
18 changes: 18 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ export class NatsConnectionImpl implements NatsConnection {
this.protocol.publish(subject, data, options);
}

publishMessage(msg: Msg) {
return this.publish(msg.subject, msg.data, {
reply: msg.reply,
headers: msg.headers,
});
}

respondMessage(msg: Msg) {
if (msg.reply) {
this.publish(msg.reply, msg.data, {
reply: msg.reply,
headers: msg.headers,
});
return true;
}
return false;
}

subscribe(
subject: string,
opts: SubscriptionOptions = {},
Expand Down
85 changes: 85 additions & 0 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ import {
headers,
isIP,
NatsConnectionImpl,
Payload,
PublishOptions,
RequestStrategy,
SubscriptionImpl,
} from "../nats-base-client/internal_mod.ts";
import { Feature } from "../nats-base-client/semver.ts";
import { syncIterator } from "../nats-base-client/core.ts";
import {
MsgHdrs,
Publisher,
} from "https://deno.land/x/nats@v1.18.0/nats-base-client/core.ts";

Deno.test("basics - connect port", async () => {
const ns = await NatsServer.start();
Expand Down Expand Up @@ -1374,3 +1380,82 @@ Deno.test("basics - sync subscription", async () => {

await cleanup(ns, nc);
});

Deno.test("basics - publish message", async () => {
const { ns, nc } = await setup();
const sub = nc.subscribe("q");

const nis = new MM(nc);
nis.data = new TextEncoder().encode("not in service");

(async () => {
for await (const m of sub) {
if (m.reply) {
nis.subject = m.reply;
nc.publishMessage(nis);
}
}
})().then();

const r = await nc.request("q");
assertEquals(r.string(), "not in service");

await cleanup(ns, nc);
});

Deno.test("basics - respond message", async () => {
const { ns, nc } = await setup();
const sub = nc.subscribe("q");

const nis = new MM(nc);
nis.data = new TextEncoder().encode("not in service");

(async () => {
for await (const m of sub) {
if (m.reply) {
nis.reply = m.reply;
nc.respondMessage(nis);
}
}
})().then();

const r = await nc.request("q");
assertEquals(r.string(), "not in service");

await cleanup(ns, nc);
});

class MM implements Msg {
data!: Uint8Array;
sid: number;
subject!: string;
reply?: string;
headers?: MsgHdrs;
publisher: Publisher;

constructor(p: Publisher) {
this.publisher = p;
this.sid = -1;
}

json<T>(): T {
throw new Error("not implemented");
}

respond(payload?: Payload, opts?: PublishOptions): boolean {
if (!this.reply) {
return false;
}
payload = payload || Empty;
this.publisher.publish(this.reply, payload, opts);
return true;
}

respondMessage(m: Msg): boolean {
return this.respond(m.data, { headers: m.headers, reply: m.reply });
}

string(): string {
return "";
}
}

0 comments on commit a4d0f29

Please sign in to comment.