From a2adf5d504638cd3e55a2cf4a85f26ac2f75e51f Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 2 May 2024 17:07:36 -0500 Subject: [PATCH] [FEAT] nats examples [FIX] internal EventSource creation now checks for errors from the event source on creation. [CHANGE] added support for publish/request with headers and made nats headers by a HeadersInit to allow for standard header types [FIX] added a `flush()` to help tests - need to implement that on the gateway normally. --- nhgc/README.md | 133 ++++++++++++++++++++++++++++++++++++++++-- nhgc/examples/nats.ts | 94 +++++++++++++++++++++++++++++ nhgc/kv.ts | 9 +-- nhgc/nats.ts | 72 ++++++++++++++++++----- nhgc/nats_test.ts | 46 +++++++++++++-- nhgc/types.ts | 38 ++++++++++-- nhgc/util.ts | 17 ++++++ 7 files changed, 374 insertions(+), 35 deletions(-) create mode 100644 nhgc/examples/nats.ts diff --git a/nhgc/README.md b/nhgc/README.md index ea4e1d0..ab46392 100644 --- a/nhgc/README.md +++ b/nhgc/README.md @@ -1,16 +1,15 @@ # nhgc -NHGC is a prototype API in TypeScript that interacts with the NATS KV over the +NHGC is a prototype API in TypeScript that interacts with KV and NATS over the NATS HTTP Gateway. The NATS HTTP Gateway is a Synadia Server that allows clients -to communicate with NATS KV via the HTTP protocol. The NATS HTTP Gateway is part -of the [Synadia Cloud](https://www.synadia.com/). +to communicate with NATS via the HTTP protocol. The NATS HTTP Gateway is part of +the [Synadia Cloud](https://www.synadia.com/). While no library is needed to interact with the gateway except for standard libraries built into most programming languages, a simple API wrapper makes it very easy for developers. -The NHG client is an ES module that depends on `fetch` and `EventSource` (SSE) -for KV `watch` operations. +The NHG client is an ES module that depends on `fetch` and `EventSource` (SSE). ### Installing @@ -55,7 +54,17 @@ import { newNHG } from "@synadiaorbit/nhgc"; // create an instance of the API using the specified connection details const nhg = newNHG("https://someurl.com", "someapikey"); +``` + +The NHG client exposes: + +- `kvm` property which implements `Kvm` which allows you to work with KV +- `nats` property which implements `Nats` which allows you to work with NATS + core (publish, subscribe, request) + +### KV +```typescript // generate a unique KV name const id = crypto.randomUUID(); @@ -190,3 +199,117 @@ await watch.stopped; // and yes, you can destroy a KV and all its values await nhg.kvm.destroy(id); ``` + +### NATS + +The NATS functionality only includes: + +- `publish` which allows you to publish a message to a subject +- `publishWithReply` which allows you to publish a message to a subject while + specifying a reply subject where responses can be sent. +- `subscribe` which allows to express interest to receive messages on a specific + subject +- `request` which allows to publish a request that return the first response + received. +- `flush` which allows you to verify that messages up to that point have been + forwarded to the server, this is typically useful when writing tests. + +If you have used NATS in the past this functionality will be familiar. + +```typescript +// create an nhg as shown ealier +const nc = nhg.nats; + +// to receive messages you can create a subscription. +// subscriptions are backed by an EventSource (SSE)". +// in this case we are interested in messages published to +// "hello". +let count = 0; +const sub = await nc.subscribe("hello", (err, msg) => { + if (err) { + console.error(err.message); + return; + } + if (msg) { + count++; + // print a message + console.log( + `[${count}]: msg with payload "${msg.string()}" which has ${ + msg.data?.length || 0 + } bytes`, + ); + // print the headers + console.log("\t", msg.headers); + + // if you think the data is JSON, you can use: + // msg.json() to decode it. + } +}); + +// you can publish an empty message - this have no data, but can be useful +// to signal some event where the subject is descriptive of what is happening. +await nc.publish("hello"); + +// you can specify a payload - payload can be a string, an Uint8Array or a ReadableStream +await nc.publish("hello", "world"); + +// you can also specify headers that will be included on the message. +// the only restriction is that the header keys must begin with the prefix `NatsH-`. +// Note that the prefix will be stripped. So for `NatsH-Hello` the NATS message +// will have a `Hello` header. The reason for the prefix is to ensure that +// other HTTP headers are not leaked to subscribers. +await nc.publish("hello", "world", { + headers: { + "NatsH-Language": "EN", + }, +}); + +// Implementing a service is simply a subscription that sends a message to the +// reply subject included in the request message. If a message is not responded +// it will timeout on the client. +const svc = await nc.subscribe("q", async (err, msg) => { + if (err) { + console.error(err.message); + return; + } + if (msg?.reply) { + // echo the request - we are going to echo all the headers back and because + // we are using the gateway, we need to transform them: + const headers = new Headers(); + msg.headers.forEach((v, k) => { + headers.append(`NatsH-${k}`, v); + }); + await nc.publish(msg.reply, msg.data, { headers }); + } else { + console.log("message doesn't have a reply - ignoring"); + } +}); + +// to trigger a request - this one with a payload of `question`, and some headers. +const r = await nc.request("q", "question", { + headers: { + "NatsH-My-Header": "Hi", + }, +}); +console.log( + `got a response with payload "${r.string()}" which has ${ + r.data?.length || 0 + } bytes\n\t`, + r.headers, +); + +// finally, there's also publish with reply, that redirects the response +// to a different subscription - this is only used on advanced usages, +// but shows that you can delegate someone else to process your message. +// typically you'll just use request which will return the reply to you. +await nc.publishWithReply("q", "hello", "question2", { + headers: { + "NatsH-My-Header": "Hi", + }, +}); + +await nc.flush(); + +sub.unsubscribe(); +svc.unsubscribe(); +``` diff --git a/nhgc/examples/nats.ts b/nhgc/examples/nats.ts new file mode 100644 index 0000000..feb910f --- /dev/null +++ b/nhgc/examples/nats.ts @@ -0,0 +1,94 @@ +import { newNHG } from "../mod.ts"; +import { getConnectionDetails } from "../credentials.ts"; + +// create the gateway client +const nhg = newNHG(getConnectionDetails()); + +const nc = nhg.nats; + +// to receive messages you can create a subscription. +// subscriptions are backed by an EventSource (SSE)": +let count = 0; +const sub = await nc.subscribe("hello", (err, msg) => { + if (err) { + console.error(err.message); + return; + } + if (msg) { + count++; + // print a message + console.log( + `[${count}]: msg with payload "${msg.string()}" which has ${ + msg.data?.length || 0 + } bytes`, + ); + // print the headers + console.log("\t", msg.headers); + + // if you think the data is JSON, you can use: + // msg.json() to decode it. + } +}); + +// you can publish an empty message +await nc.publish("hello"); + +// you can specify a payload - payload can be a string, an Uint8Array or a ReadableStream +await nc.publish("hello", "world"); + +// you can also specify headers that will be included on the message. +// the only restriction is that the header keys must begin with the prefix `NatsH-`. +// Note that the prefix will be stripped. So for `NatsH-Hello` the NATS message +// will have a `Hello` header. +await nc.publish("hello", "world", { + headers: { + "NatsH-Language": "EN", + }, +}); + +// Implementing a service is simply a subscription that replies +const svc = await nc.subscribe("q", async (err, msg) => { + if (err) { + console.error(err.message); + return; + } + if (msg?.reply) { + // echo the request - we are going to echo all the headers back and because + // we are using the gateway, we need to transform them: + const headers = new Headers(); + msg.headers.forEach((v, k) => { + headers.append(`NatsH-${k}`, v); + }); + await nc.publish(msg.reply, msg.data, { headers }); + } else { + console.log("message doesn't have a reply - ignoring"); + } +}); + +// to trigger a request: +const r = await nc.request("q", "question", { + headers: { + "NatsH-My-Header": "Hi", + }, +}); +console.log( + `got a response with payload "${r.string()}" which has ${ + r.data?.length || 0 + } bytes\n\t`, + r.headers, +); + +// now - it is also publish a request, that redirects to a different +// subscription - this is only used on advanced usages, but shows that +// you can delegate someone else to process your message. + +await nc.publishWithReply("q", "hello", "question2", { + headers: { + "NatsH-My-Header": "Hi", + }, +}); + +await nc.flush(); + +sub.unsubscribe(); +svc.unsubscribe(); diff --git a/nhgc/kv.ts b/nhgc/kv.ts index 187c6ef..e7fdfa3 100644 --- a/nhgc/kv.ts +++ b/nhgc/kv.ts @@ -26,7 +26,7 @@ import { Watcher, } from "./types.ts"; import { HttpImpl } from "./nhgc.ts"; -import { Deferred, deferred } from "./util.ts"; +import { addEventSource, Deferred, deferred } from "./util.ts"; type KvE = { bucket: string; @@ -272,9 +272,10 @@ export class KvImpl extends HttpImpl implements Kv { ? `/v1/kvm/buckets/${this.bucket}/watch?${qs}` : `/v1/kvm/buckets/${this.bucket}/watch`; - return Promise.resolve( - new KvWatcher(new EventSource(new URL(path, this.url)), opts.callback), - ); + return addEventSource(new URL(path, this.url)) + .then((es) => { + return new KvWatcher(es, opts.callback); + }); } async info(): Promise { diff --git a/nhgc/nats.ts b/nhgc/nats.ts index 792447b..5b719f6 100644 --- a/nhgc/nats.ts +++ b/nhgc/nats.ts @@ -8,7 +8,7 @@ import { Sub, Value, } from "./types.ts"; -import { deferred } from "./util.ts"; +import { addEventSource, deferred } from "./util.ts"; interface JsonMsg { header?: Record; @@ -22,13 +22,13 @@ export function toNatsMsg(m: MessageEvent): Msg { } class MsgImpl implements Msg { - header?: Record; + headers: Headers; subject: string; reply?: string; data?: Uint8Array; constructor(m: JsonMsg) { - this.header = m.header; + this.headers = new Headers(); this.subject = m.subject; this.reply = m.reply; if (m.data) { @@ -38,6 +38,17 @@ class MsgImpl implements Msg { this.data[i] = bin.charCodeAt(i); } } + if (m.header) { + for (const p in m.header) { + let v = m.header[p]; + if (!Array.isArray(v)) { + v = [v]; + } + v.forEach((vv) => { + this.headers.append(p, vv); + }); + } + } } string(): string { @@ -54,24 +65,31 @@ export class NatsImpl extends HttpImpl implements Nats { super(url, apiKey); } - publish(subject: string, data?: Value): Promise { - return this.publishWithReply(subject, "", data); + publish(subject: string, data?: Value, opts?: { + headers?: HeadersInit; + }): Promise { + return this.publishWithReply(subject, "", data, opts); } async publishWithReply( subject: string, reply: string, data?: Value, + opts?: { headers?: HeadersInit }, ): Promise { - const opts = []; + const args = []; if (reply?.length > 0) { - opts.push(`reply=${encodeURIComponent(reply)}`); + args.push(`reply=${encodeURIComponent(reply)}`); } - const qs = opts.length ? opts.join("&") : ""; + const qs = args.length ? args.join("&") : ""; const p = qs ? `/v1/nats/subjects/${subject}?${qs}` : `/v1/nats/subjects/${subject}`; - const r = await this.doFetch("PUT", p, data); + opts = opts || {}; + const hi = opts.headers || {}; + const headers = new Headers(hi); + + const r = await this.doFetch("PUT", p, data, { headers }); if (!r.ok) { return this.handleError(r); } @@ -80,8 +98,8 @@ export class NatsImpl extends HttpImpl implements Nats { } async request( subject: string, - data: Value, - opts?: { timeout?: number }, + data?: Value, + opts?: { timeout?: number; headers?: HeadersInit }, ): Promise { const args = []; opts = opts || {}; @@ -95,12 +113,15 @@ export class NatsImpl extends HttpImpl implements Nats { ? `/v1/nats/subjects/${subject}?${qs}` : `/v1/nats/subjects/${subject}`; + const headers = opts.headers ? new Headers(opts.headers) : new Headers(); + headers.append("Accept", "application/json"); + const r = await this.doFetch( "POST", p, data, { - headers: { "Accept": "application/json" }, + headers, }, ); @@ -119,16 +140,35 @@ export class NatsImpl extends HttpImpl implements Nats { } const qs = args.length ? args.join("&") : ""; const path = `/v1/nats/subjects/${subject}?${qs}`; - return Promise.resolve(new EventSource(new URL(path, this.url))); + + return addEventSource(new URL(path, this.url)); } async subscribe(subject: string, cb: MsgCallback): Promise { - const d = deferred(); const es = await this.sub(subject); - es.addEventListener("open", () => { - d.resolve(new SubImpl(es, cb)); + return Promise.resolve(new SubImpl(es, cb)); + } + + async flush(): Promise { + // this here until gateway supports flush directly + function inbox(length = 6): string { + return Math.random().toString(20).substring(2, length - 1); + } + + const subj = `_INBOX.${inbox()}`; + + const d = deferred(); + const sub = await this.subscribe(subj, (err) => { + if (err) { + d.reject(err.message); + sub.unsubscribe(); + } else { + d.resolve(); + sub.unsubscribe(); + } }); + await this.publish(subj).then(); return d; } } diff --git a/nhgc/nats_test.ts b/nhgc/nats_test.ts index def07a0..72df155 100644 --- a/nhgc/nats_test.ts +++ b/nhgc/nats_test.ts @@ -2,7 +2,9 @@ import { newNHG } from "./mod.ts"; import { delay } from "https://deno.land/std@0.200.0/async/delay.ts"; import { getConnectionDetails } from "./credentials.ts"; import { + assert, assertEquals, + assertExists, fail, } from "https://deno.land/std@0.207.0/assert/mod.ts"; @@ -13,22 +15,24 @@ Deno.test("nats - pub", async () => { Deno.test("nats - sub", async () => { const nhg = newNHG(getConnectionDetails()); + const msgs = []; const sub = await nhg.nats.subscribe("hello", (err, msg) => { if (err) { - console.log(err); + fail(err.message); } else if (msg) { - console.log(msg.string()); + msgs.push(msg); } }); const ticker = setInterval(() => { nhg.nats.publish("hello", new Date().toISOString()) .then(); - }, 1000); + }, 250); - await delay(5000); - clearInterval(ticker); + await delay(1000); sub.unsubscribe(); + clearInterval(ticker); + assert(msgs.length > 0); }); Deno.test("nats - request reply", async () => { @@ -46,6 +50,38 @@ Deno.test("nats - request reply", async () => { }); const r = await nc.request("q", "hello"); assertEquals(r.string(), "OK: hello"); + sub.unsubscribe(); +}); + +Deno.test("nats - publish headers", async () => { + const nhg = newNHG(getConnectionDetails()); + const nc = nhg.nats; + const sub = await nc.subscribe("q", (err, msg) => { + if (err) { + fail(err.message); + } + assertExists(msg); + assertEquals(msg.headers.get("Hello-World"), "Hi"); + }); + + await nc.publish("q", undefined, { headers: { "NatsH-Hello-World": "Hi" } }); + await nc.flush(); + sub.unsubscribe(); +}); + +Deno.test("nats - request headers", async () => { + const nhg = newNHG(getConnectionDetails()); + const nc = nhg.nats; + const sub = await nc.subscribe("q", (err, msg) => { + if (err) { + fail(err.message); + } + assertExists(msg); + assertEquals(msg.headers.get("Hello-World"), "Hi"); + assertExists(msg.reply); + nc.publish(msg.reply); + }); + await nc.request("q", undefined, { headers: { "NatsH-Hello-World": "Hi" } }); sub.unsubscribe(); }); diff --git a/nhgc/types.ts b/nhgc/types.ts index 03f9e86..119bf57 100644 --- a/nhgc/types.ts +++ b/nhgc/types.ts @@ -171,6 +171,8 @@ export interface Kv { /** * Returns a watcher that notifies you of changes to the KV. + * This creates an EventSource under the hood - note that if + * there's an error, the reason for the error is not available. * @param opts */ watch(opts: KvWatchOpts): Promise; @@ -363,7 +365,7 @@ export interface Kvm { } export interface Msg { - header?: Record; + headers: Headers; subject: string; reply?: string; data?: Uint8Array; @@ -383,8 +385,13 @@ export interface Nats { * Publishes the specified data to the specified subject. * @param subject * @param data + * @param opts - { headers?: HeadersInit } */ - publish(subject: string, data?: Value): Promise; + publish( + subject: string, + data?: Value, + opts?: { headers?: HeadersInit }, + ): Promise; /** * Publishes the specified data to the specified subject with the @@ -392,8 +399,17 @@ export interface Nats { * @param subject * @param reply * @param data + * @param opts - { headers?: HeadersInit } + * + * You can also specify headers - Note only headers that start with + * `NatsH-` are sent via NATS. */ - publishWithReply(subject: string, reply: string, data: Value): Promise; + publishWithReply( + subject: string, + reply: string, + data?: Value, + opts?: { headers?: HeadersInit }, + ): Promise; /** * Publishes a request with specified data in the specified subject expecting a @@ -401,13 +417,18 @@ export interface Nats { * Promise that resolves when the first response to the request is received. If * there are no responders (a subscription) listening on the request subject, * the request will fail as soon as the server processes it. + * + * You can specify a timeout in milliseconds - default is 2000 milliseconds. + * + * You can also specify headers - Note only headers that start with + * `NatsH-` are sent via NATS. * @param subject * @param data - * @param timeout + * @param opts - { timeout?: number; headers?: HeadersInit } */ request( subject: string, - data: Value, + data?: Value, opts?: { timeout?: number; headers?: HeadersInit }, ): Promise; @@ -415,8 +436,15 @@ export interface Nats { * Subscribe expresses interest in the specified subject. The subject may * have wildcards. Messages are delivered to the callback * subscribe(subject: string, cb: MsgCallback): Promise; + * This creates an EventSource under the hood - note that if + * there's an error, the reason for the error is not available. * @param subject * @param cb (err?: Error, msg?: Msg) => void; */ subscribe(subject: string, cb: MsgCallback): Promise; + + /** + * Performs a round trip to the server. + */ + flush(): Promise; } diff --git a/nhgc/util.ts b/nhgc/util.ts index af68d04..1b584e9 100644 --- a/nhgc/util.ts +++ b/nhgc/util.ts @@ -37,3 +37,20 @@ export function deferred(): Deferred { }); return Object.assign(p, methods) as Deferred; } + +export function addEventSource( + url: string | URL, + eventSourceInitDict?: EventSourceInit, +): Promise { + const d = deferred(); + const es = new EventSource(url, eventSourceInitDict); + es.addEventListener("open", () => { + d.resolve(es); + }); + + es.addEventListener("error", () => { + d.reject(new Error("error creating the EventSource")); + }); + + return d; +}