Skip to content

Commit

Permalink
[FEAT] nats examples
Browse files Browse the repository at this point in the history
[FIX] internal EventSource creation now checks for errors from the event source on creation.
[CHANGE] added support for publish/request with headers and made nats headers by a HeadersInit to allow for standard header types
[FIX] added a `flush()` to help tests - need to implement that on the gateway normally.
  • Loading branch information
aricart committed May 2, 2024
1 parent 139e5fd commit a2adf5d
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 35 deletions.
133 changes: 128 additions & 5 deletions nhgc/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# nhgc

NHGC is a prototype API in TypeScript that interacts with the NATS KV over the
NHGC is a prototype API in TypeScript that interacts with KV and NATS over the
NATS HTTP Gateway. The NATS HTTP Gateway is a Synadia Server that allows clients
to communicate with NATS KV via the HTTP protocol. The NATS HTTP Gateway is part
of the [Synadia Cloud](https://www.synadia.com/).
to communicate with NATS via the HTTP protocol. The NATS HTTP Gateway is part of
the [Synadia Cloud](https://www.synadia.com/).

While no library is needed to interact with the gateway except for standard
libraries built into most programming languages, a simple API wrapper makes it
very easy for developers.

The NHG client is an ES module that depends on `fetch` and `EventSource` (SSE)
for KV `watch` operations.
The NHG client is an ES module that depends on `fetch` and `EventSource` (SSE).

### Installing

Expand Down Expand Up @@ -55,7 +54,17 @@ import { newNHG } from "@synadiaorbit/nhgc";

// create an instance of the API using the specified connection details
const nhg = newNHG("https://someurl.com", "someapikey");
```

The NHG client exposes:

- `kvm` property which implements `Kvm` which allows you to work with KV
- `nats` property which implements `Nats` which allows you to work with NATS
core (publish, subscribe, request)

### KV

```typescript
// generate a unique KV name
const id = crypto.randomUUID();

Expand Down Expand Up @@ -190,3 +199,117 @@ await watch.stopped;
// and yes, you can destroy a KV and all its values
await nhg.kvm.destroy(id);
```

### NATS

The NATS functionality only includes:

- `publish` which allows you to publish a message to a subject
- `publishWithReply` which allows you to publish a message to a subject while
specifying a reply subject where responses can be sent.
- `subscribe` which allows to express interest to receive messages on a specific
subject
- `request` which allows to publish a request that return the first response
received.
- `flush` which allows you to verify that messages up to that point have been
forwarded to the server, this is typically useful when writing tests.

If you have used NATS in the past this functionality will be familiar.

```typescript
// create an nhg as shown ealier
const nc = nhg.nats;

// to receive messages you can create a subscription.
// subscriptions are backed by an EventSource (SSE)".
// in this case we are interested in messages published to
// "hello".
let count = 0;
const sub = await nc.subscribe("hello", (err, msg) => {
if (err) {
console.error(err.message);
return;
}
if (msg) {
count++;
// print a message
console.log(
`[${count}]: msg with payload "${msg.string()}" which has ${
msg.data?.length || 0
} bytes`,
);
// print the headers
console.log("\t", msg.headers);

// if you think the data is JSON, you can use:
// msg.json() to decode it.
}
});

// you can publish an empty message - this have no data, but can be useful
// to signal some event where the subject is descriptive of what is happening.
await nc.publish("hello");

// you can specify a payload - payload can be a string, an Uint8Array or a ReadableStream<Uint8Array>
await nc.publish("hello", "world");

// you can also specify headers that will be included on the message.
// the only restriction is that the header keys must begin with the prefix `NatsH-`.
// Note that the prefix will be stripped. So for `NatsH-Hello` the NATS message
// will have a `Hello` header. The reason for the prefix is to ensure that
// other HTTP headers are not leaked to subscribers.
await nc.publish("hello", "world", {
headers: {
"NatsH-Language": "EN",
},
});

// Implementing a service is simply a subscription that sends a message to the
// reply subject included in the request message. If a message is not responded
// it will timeout on the client.
const svc = await nc.subscribe("q", async (err, msg) => {
if (err) {
console.error(err.message);
return;
}
if (msg?.reply) {
// echo the request - we are going to echo all the headers back and because
// we are using the gateway, we need to transform them:
const headers = new Headers();
msg.headers.forEach((v, k) => {
headers.append(`NatsH-${k}`, v);
});
await nc.publish(msg.reply, msg.data, { headers });
} else {
console.log("message doesn't have a reply - ignoring");
}
});

// to trigger a request - this one with a payload of `question`, and some headers.
const r = await nc.request("q", "question", {
headers: {
"NatsH-My-Header": "Hi",
},
});
console.log(
`got a response with payload "${r.string()}" which has ${
r.data?.length || 0
} bytes\n\t`,
r.headers,
);

// finally, there's also publish with reply, that redirects the response
// to a different subscription - this is only used on advanced usages,
// but shows that you can delegate someone else to process your message.
// typically you'll just use request which will return the reply to you.
await nc.publishWithReply("q", "hello", "question2", {
headers: {
"NatsH-My-Header": "Hi",
},
});

await nc.flush();

sub.unsubscribe();
svc.unsubscribe();
```
94 changes: 94 additions & 0 deletions nhgc/examples/nats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { newNHG } from "../mod.ts";
import { getConnectionDetails } from "../credentials.ts";

// create the gateway client
const nhg = newNHG(getConnectionDetails());

const nc = nhg.nats;

// to receive messages you can create a subscription.
// subscriptions are backed by an EventSource (SSE)":
let count = 0;
const sub = await nc.subscribe("hello", (err, msg) => {
if (err) {
console.error(err.message);
return;
}
if (msg) {
count++;
// print a message
console.log(
`[${count}]: msg with payload "${msg.string()}" which has ${
msg.data?.length || 0
} bytes`,
);
// print the headers
console.log("\t", msg.headers);

// if you think the data is JSON, you can use:
// msg.json() to decode it.
}
});

// you can publish an empty message
await nc.publish("hello");

// you can specify a payload - payload can be a string, an Uint8Array or a ReadableStream<Uint8Array>
await nc.publish("hello", "world");

// you can also specify headers that will be included on the message.
// the only restriction is that the header keys must begin with the prefix `NatsH-`.
// Note that the prefix will be stripped. So for `NatsH-Hello` the NATS message
// will have a `Hello` header.
await nc.publish("hello", "world", {
headers: {
"NatsH-Language": "EN",
},
});

// Implementing a service is simply a subscription that replies
const svc = await nc.subscribe("q", async (err, msg) => {
if (err) {
console.error(err.message);
return;
}
if (msg?.reply) {
// echo the request - we are going to echo all the headers back and because
// we are using the gateway, we need to transform them:
const headers = new Headers();
msg.headers.forEach((v, k) => {
headers.append(`NatsH-${k}`, v);
});
await nc.publish(msg.reply, msg.data, { headers });
} else {
console.log("message doesn't have a reply - ignoring");
}
});

// to trigger a request:
const r = await nc.request("q", "question", {
headers: {
"NatsH-My-Header": "Hi",
},
});
console.log(
`got a response with payload "${r.string()}" which has ${
r.data?.length || 0
} bytes\n\t`,
r.headers,
);

// now - it is also publish a request, that redirects to a different
// subscription - this is only used on advanced usages, but shows that
// you can delegate someone else to process your message.

await nc.publishWithReply("q", "hello", "question2", {
headers: {
"NatsH-My-Header": "Hi",
},
});

await nc.flush();

sub.unsubscribe();
svc.unsubscribe();
9 changes: 5 additions & 4 deletions nhgc/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
Watcher,
} from "./types.ts";
import { HttpImpl } from "./nhgc.ts";
import { Deferred, deferred } from "./util.ts";
import { addEventSource, Deferred, deferred } from "./util.ts";

type KvE = {
bucket: string;
Expand Down Expand Up @@ -272,9 +272,10 @@ export class KvImpl extends HttpImpl implements Kv {
? `/v1/kvm/buckets/${this.bucket}/watch?${qs}`
: `/v1/kvm/buckets/${this.bucket}/watch`;

return Promise.resolve(
new KvWatcher(new EventSource(new URL(path, this.url)), opts.callback),
);
return addEventSource(new URL(path, this.url))
.then((es) => {
return new KvWatcher(es, opts.callback);
});
}

async info(): Promise<KvBucketInfo> {
Expand Down
Loading

0 comments on commit a2adf5d

Please sign in to comment.