Skip to content

Commit

Permalink
simplified doc (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 10, 2023
1 parent acd6837 commit 274dc50
Show file tree
Hide file tree
Showing 31 changed files with 921 additions and 304 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ jobs:
environment: CI
strategy:
matrix:
deno-version: [1.30.3]
deno-version: [1.32.5]

steps:
- name: Git Checkout Deno Module
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 1

- name: Use Deno Version ${{ matrix.deno-version }}
uses: denolib/setup-deno@master
uses: denoland/setup-deno@v1
with:
deno-version: ${{ matrix.deno-version }}

Expand Down
34 changes: 34 additions & 0 deletions examples/jetstream/01_consumers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);
console.log(await c.info(true));

// getting an ordered consumer requires no name
// as the library will create it
const oc = await js.consumers.get(stream);
console.log(await oc.info(true));

await nc.close();
39 changes: 39 additions & 0 deletions examples/jetstream/02_next.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is simply asked for one message by default
// this will resolve in 30s or null is returned
const m = await c.next();
if (m) {
console.log(m.subject);
m.ack();
} else {
console.log(`didn't get a message`);
}

await nc.close();
38 changes: 38 additions & 0 deletions examples/jetstream/03_batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 10);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// with fetch the consumer can manually control the buffering
for (let i = 0; i < 3; i++) {
const messages = await c.fetch({ max_messages: 4, expires: 2000 });
for await (const m of messages) {
m.ack();
}
console.log(`batch completed: ${messages.getProcessed()} msgs processed`);
}

await nc.close();
43 changes: 43 additions & 0 deletions examples/jetstream/04_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is wrapped in loop because this way, if there's some failure
// it will re-setup consume, and carry on.
// this is the basic pattern for processing messages forever
while (true) {
console.log("waiting for messages");
const messages = await c.consume();
try {
for await (const m of messages) {
console.log(m.seq);
m.ack();
}
} catch (err) {
console.log(`consume failed: ${err.message}`);
}
}
36 changes: 36 additions & 0 deletions examples/jetstream/05_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// we can consume using callbacks too
console.log("waiting for messages");
await c.consume({
callback: (m) => {
console.log(m.seq);
m.ack();
},
});
54 changes: 54 additions & 0 deletions examples/jetstream/06_heartbeats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";
import { ConsumerEvents } from "../../nats-base-client/types.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();

const c = await js.consumers.get(stream, consumer);
while (true) {
const messages = await c.consume({ max_messages: 1 });

// watch the to see if the consume operation misses heartbeats
(async () => {
for await (const s of await messages.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
// you can decide how many heartbeats you are willing to miss
const n = s.data as number;
console.log(`${n} heartbeats missed`);
if (n === 2) {
// by calling `stop()` the message processing loop ends
// in this case this is wrapped by a loop, so it attempts
// to re-setup the consume
messages.stop();
}
}
}
})();

for await (const m of messages) {
console.log(`${m.seq} ${m?.subject}`);
m.ack();
}
}
56 changes: 56 additions & 0 deletions examples/jetstream/07_consume_jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect, JsMsg } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";
import { delay, SimpleMutex } from "../../nats-base-client/util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 100);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that is monitors a rate limiter
// this effectively allows it to process 5 messages as fast
// as it can but stalls the consume if no resources are available
const messages = await c.consume({ max_messages: 10 });

// this rate limiter is just example code, do not use in production
const rl = new SimpleMutex(5);

async function schedule(m: JsMsg): Promise<void> {
// pretend to do work
await delay(1000);
m.ack();
console.log(`${m.seq}`);
}

for await (const m of messages) {
await rl.lock();
schedule(m)
.catch((err) => {
console.log(`failed processing: ${err.message}`);
m.nak();
})
.finally(() => {
rl.unlock();
});
}
59 changes: 59 additions & 0 deletions examples/jetstream/08_consume_process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 10000);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that processes the stream
// creating a frequency table based on the subjects found
const messages = await c.consume();

const data = new Map<string, number>();
for await (const m of messages) {
const chunks = m.subject.split(".");
const v = data.get(chunks[1]) || 0;
data.set(chunks[1], v + 1);
m.ack();

// if no pending, then we have processed the stream
// and we can break
if (m.info.pending === 0) {
break;
}
}

// we can safely delete the consumer
await c.delete();

const keys = [];
for (const k of data.keys()) {
keys.push(k);
}
keys.sort();
keys.forEach((k) => {
console.log(`${k}: ${data.get(k)}`);
});

await nc.close();
Loading

0 comments on commit 274dc50

Please sign in to comment.