Skip to content

Commit

Permalink
added more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 10, 2023
1 parent 1ae18cc commit 808ef9e
Show file tree
Hide file tree
Showing 30 changed files with 764 additions and 139 deletions.
34 changes: 34 additions & 0 deletions examples/jetstream/01_consumers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);
console.log(await c.info(true));

// getting an ordered consumer requires no name
// as the library will create it
const oc = await js.consumers.get(stream);
console.log(await oc.info(true));

await nc.close();
39 changes: 39 additions & 0 deletions examples/jetstream/02_next.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is simply asked for one message by default
// this will resolve in 30s or null is returned
const m = await c.next();
if (m) {
console.log(m.subject);
m.ack();
} else {
console.log(`didn't get a message`);
}

await nc.close();
38 changes: 38 additions & 0 deletions examples/jetstream/03_batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 10);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// with fetch the consumer can manually control the buffering
for (let i = 0; i < 3; i++) {
const messages = await c.fetch({ max_messages: 4, expires: 2000 });
for await (const m of messages) {
m.ack();
}
console.log(`batch completed: ${messages.getProcessed()} msgs processed`);
}

await nc.close();
43 changes: 43 additions & 0 deletions examples/jetstream/04_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is wrapped in loop because this way, if there's some failure
// it will re-setup consume, and carry on.
// this is the basic pattern for processing messages forever
while (true) {
console.log("waiting for messages");
const messages = await c.consume();
try {
for await (const m of messages) {
console.log(m.seq);
m.ack();
}
} catch (err) {
console.log(`consume failed: ${err.message}`);
}
}
36 changes: 36 additions & 0 deletions examples/jetstream/05_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// we can consume using callbacks too
console.log("waiting for messages");
await c.consume({
callback: (m) => {
console.log(m.seq);
m.ack();
},
});
54 changes: 54 additions & 0 deletions examples/jetstream/06_heartbeats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";
import { ConsumerEvents } from "../../nats-base-client/types.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc);

// retrieve an existing consumer
const js = nc.jetstream();

const c = await js.consumers.get(stream, consumer);
while (true) {
const messages = await c.consume({ max_messages: 1 });

// watch the to see if the consume operation misses heartbeats
(async () => {
for await (const s of await messages.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
// you can decide how many heartbeats you are willing to miss
const n = s.data as number;
console.log(`${n} heartbeats missed`);
if (n === 2) {
// by calling `stop()` the message processing loop ends
// in this case this is wrapped by a loop, so it attempts
// to re-setup the consume
messages.stop();
}
}
}
})();

for await (const m of messages) {
console.log(`${m.seq} ${m?.subject}`);
m.ack();
}
}
56 changes: 56 additions & 0 deletions examples/jetstream/07_consume_jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect, JsMsg } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";
import { delay, SimpleMutex } from "../../nats-base-client/util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 100);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that is monitors a rate limiter
// this effectively allows it to process 5 messages as fast
// as it can but stalls the consume if no resources are available
const messages = await c.consume({ max_messages: 10 });

// this rate limiter is just example code, do not use in production
const rl = new SimpleMutex(5);

async function schedule(m: JsMsg): Promise<void> {
// pretend to do work
await delay(1000);
m.ack();
console.log(`${m.seq}`);
}

for await (const m of messages) {
await rl.lock();
schedule(m)
.catch((err) => {
console.log(`failed processing: ${err.message}`);
m.nak();
})
.finally(() => {
rl.unlock();
});
}
59 changes: 59 additions & 0 deletions examples/jetstream/08_consume_process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../src/mod.ts";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
const nc = await connect();

// create a stream with a random name with some messages and a consumer
const { stream, consumer } = await setupStreamAndConsumer(nc, 10000);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that processes the stream
// creating a frequency table based on the subjects found
const messages = await c.consume();

const data = new Map<string, number>();
for await (const m of messages) {
const chunks = m.subject.split(".");
const v = data.get(chunks[1]) || 0;
data.set(chunks[1], v + 1);
m.ack();

// if no pending, then we have processed the stream
// and we can break
if (m.info.pending === 0) {
break;
}
}

// we can safely delete the consumer
await c.delete();

const keys = [];
for (const k of data.keys()) {
keys.push(k);
}
keys.sort();
keys.forEach((k) => {
console.log(`${k}: ${data.get(k)}`);
});

await nc.close();
30 changes: 30 additions & 0 deletions examples/jetstream/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { nuid } from "../../nats-base-client/nuid.ts";
import { createConsumer, fill, initStream } from "../../tests/jstest_util.ts";
import { NatsConnection } from "../../src/mod.ts";

export async function setupStreamAndConsumer(
nc: NatsConnection,
messages = 100,
): Promise<{ stream: string; consumer: string }> {
const stream = nuid.next();
await initStream(nc, stream, { subjects: [`${stream}.*`] });
await fill(nc, stream, messages, { randomize: true });
const consumer = await createConsumer(nc, stream);

return { stream, consumer };
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 808ef9e

Please sign in to comment.