Skip to content

Commit

Permalink
fixed ci by adding an entry to demo.nats.io - gha seems to have stale…
Browse files Browse the repository at this point in the history
… data, and tests fail
  • Loading branch information
aricart committed Nov 1, 2021
1 parent 8513d54 commit 34bdd3c
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 2 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.6.2" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.6.3" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
run: |
sudo echo "145.40.102.131 demo.nats.io" | sudo tee -a /etc/hosts
- name: Get nats-server
run: |
Expand Down
104 changes: 104 additions & 0 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,107 @@ await jsm.consumers.add(stream, {

await sub.closed;
```

#### JetStream Ordered Consumers

An Ordered Consumers is a specialized push consumer that puts together flow
control, heartbeats, and additional logic to handle message gaps. Ordered
consumers cannot operate on a queue and cannot be durable.

As the ordered consumer processes messages, it enforces that messages are
presented to the client with the correct sequence. If a gap is detected, the
consumer is recreated at the expected sequence.

Most consumer options are rejected, as the ordered consumer has manages its
configuration in a very specific way.

To create an ordered consumer (assuming a stream that captures `my.messages`):

```typescript
const subj = "my.messages";
const opts = consumerOpts();
opts.orderedConsumer();
const sub = await js.subscribe(subj, opts);
```

Use callbacks or iterators as desired to process the messages.
`sub.unsubscribe()` or break out of an iterator to stop processing messages.

#### JetStream Materialized Views

JetStream clients can use streams to store and access data. The materialized
views present a different _API_ to interact with the data stored in a stream.
First materialized view for JetStream is the _KV_ view. The _KV_ view implements
a Key-Value API on top of JetStream. Clients can store and retrieve values by
Keys:

```typescript
const sc = StringCodec();
const js = nc.jetstream();
// create the named KV or bind to it if it exists:
const kv = await js.views.kv("testing", { history: 5 });

// create an entry - this is similar to a put, but will fail if the
// key exists
await kv.create("hello.world", sc.encode("hi"));

// Values in KV are stored as KvEntries:
// {
// bucket: string,
// key: string,
// value: Uint8Array,
// created: Date,
// revision: number,
// delta?: number,
// operation: "PUT"|"DEL"|"PURGE"
// }
// The operation property specifies whether the value was
// updated (PUT), deleted (DEL) or purged (PURGE).

// you can monitor values modification in a KV by watching.
// You can watch specific key subset or everything.
// Watches start with the latest value for each key in the
// set of keys being watched - in this case all keys
const watch = await kv.watch();
(async () => {
for await (const e of watch) {
// do something with the change
}
})().then();

// update the entry
await kv.put("hello.world", sc.encode("world"));
// retrieve the KvEntry storing the value
// returns null if the value is not found
const e = await kv.get("hello.world");
assert(e);
// initial value of "hi" was overwritten above
assertEquals(sc.decode(e.value), "world");

const keys = await kv.keys();
assertEquals(keys.length, 1);
assertEquals(keys[0], "hello.world");

let h = await kv.history({ key: "hello.world" });
(async () => {
for await (const e of h) {
// do something with the historical value
// you can test e.operation for "PUT", "DEL", or "PURGE"
// to know if the entry is a marker for a value set
// or for a deletion or purge.
}
})().then();

// deletes the key - the delete is recorded
await kv.delete("hello.world");

// purge is like delete, but all history values
// are dropped and only the purge remains.
await kv.purge("hello.world");

// stop the watch operation above
watch.stop();

// danger: destroys all values in the KV!
await kv.destroy();
```
17 changes: 16 additions & 1 deletion nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/

import type { ConsumerOptsBuilder } from "./types.ts";
import type { ConsumerOptsBuilder, KV, KvOptions, Views } from "./types.ts";
import {
AckPolicy,
ConsumerAPI,
Expand Down Expand Up @@ -68,6 +68,7 @@ import { Timeout, timeout } from "./util.ts";
import { createInbox } from "./protocol.ts";
import { headers } from "./headers.ts";
import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { Bucket } from "./kv.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand All @@ -81,6 +82,16 @@ enum PubHeaders {
ExpectedLastSubjectSequenceHdr = "Nats-Expected-Last-Subject-Sequence",
}

class ViewsImpl implements Views {
js: JetStreamClientImpl;
constructor(js: JetStreamClientImpl) {
this.js = js;
}
async kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
return Bucket.create(this.js.nc, name, opts);
}
}

export class JetStreamClientImpl extends BaseApiClient
implements JetStreamClient {
api: ConsumerAPI;
Expand All @@ -89,6 +100,10 @@ export class JetStreamClientImpl extends BaseApiClient
this.api = new ConsumerAPIImpl(nc, opts);
}

get views(): Views {
return new ViewsImpl(this);
}

async publish(
subj: string,
data: Uint8Array = Empty,
Expand Down
5 changes: 5 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ export type JetStreamPullSubscription = JetStreamSubscription & Pullable;

export type JsMsgCallback = (err: NatsError | null, msg: JsMsg | null) => void;

export interface Views {
kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
}

// FIXME: pulls must limit to maxAcksInFlight
export interface JetStreamClient {
publish(
Expand All @@ -321,6 +325,7 @@ export interface JetStreamClient {
subject: string,
opts: ConsumerOptsBuilder | Partial<ConsumerOpts>,
): Promise<JetStreamSubscription>;
views: Views;
}

export interface ConsumerOpts {
Expand Down
78 changes: 78 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2605,3 +2605,81 @@ Deno.test("jetstream - headers only", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - can access kv", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.2")) {
return;
}
const sc = StringCodec();

const js = nc.jetstream();
// create the named KV or bind to it if it exists:
const kv = await js.views.kv("testing", { history: 5 });

// create an entry - this is similar to a put, but will fail if the
// key exists
await kv.create("hello.world", sc.encode("hi"));

// Values in KV are stored as KvEntries:
// {
// bucket: string,
// key: string,
// value: Uint8Array,
// created: Date,
// revision: number,
// delta?: number,
// operation: "PUT"|"DEL"|"PURGE"
// }
// The operation property specifies whether the value was
// updated (PUT), deleted (DEL) or purged (PURGE).

// you can monitor values modification in a KV by watching.
// You can watch specific key subset or everything.
// Watches start with the latest value for each key in the
// set of keys being watched - in this case all keys
const watch = await kv.watch();
(async () => {
for await (const _e of watch) {
// do something with the change
}
})().then();

// update the entry
await kv.put("hello.world", sc.encode("world"));
// retrieve the KvEntry storing the value
// returns null if the value is not found
const e = await kv.get("hello.world");
assert(e);
// initial value of "hi" was overwritten above
assertEquals(sc.decode(e.value), "world");

const keys = await kv.keys();
assertEquals(keys.length, 1);
assertEquals(keys[0], "hello.world");

const h = await kv.history({ key: "hello.world" });
(async () => {
for await (const _e of h) {
// do something with the historical value
// you can test e.operation for "PUT", "DEL", or "PURGE"
// to know if the entry is a marker for a value set
// or for a deletion or purge.
}
})().then();

// deletes the key - the delete is recorded
await kv.delete("hello.world");

// purge is like delete, but all history values
// are dropped and only the purge remains.
await kv.purge("hello.world");

// stop the watch operation above
watch.stop();

// danger: destroys all values in the KV!
await kv.destroy();

await cleanup(ns, nc);
});

0 comments on commit 34bdd3c

Please sign in to comment.