Skip to content

Commit

Permalink
added more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 9, 2023
1 parent 1ae18cc commit 9276c1e
Show file tree
Hide file tree
Showing 12 changed files with 729 additions and 99 deletions.
35 changes: 35 additions & 0 deletions examples/jetstream/simplified/01_consumers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);
console.log(await c.info(true));

// getting an ordered consumer requires no name
// as the library will create it
const oc = await js.consumers.get(stream);
console.log(await oc.info(true));

await nc.close();
39 changes: 39 additions & 0 deletions examples/jetstream/simplified/02_next.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc, 1);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is simply asked for one message by default
// this will resolve in 30s or null is returned
const m = await c.next();
if (m) {
console.log(m.subject);
m.ack();
} else {
console.log(`didn't get a message`);
}

await nc.close();
38 changes: 38 additions & 0 deletions examples/jetstream/simplified/03_batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc, 10);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// with fetch the consumer can manually control the buffering
for (let i = 0; i < 3; i++) {
const messages = await c.fetch({ max_messages: 4, expires: 2000 });
for await (const m of messages) {
m.ack();
}
console.log(`batch completed: ${messages.getProcessed()} msgs processed`);
}

await nc.close();
43 changes: 43 additions & 0 deletions examples/jetstream/simplified/04_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// the consumer is wrapped in loop because this way, if there's some failure
// it will re-setup consume, and carry on.
// this is the basic pattern for processing messages forever
while (true) {
console.log("waiting for messages");
const messages = await c.consume();
try {
for await (const m of messages) {
console.log(m.seq);
m.ack();
}
} catch (err) {
console.log(`consume failed: ${err.message}`);
}
}
36 changes: 36 additions & 0 deletions examples/jetstream/simplified/04b_consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// we can consume using callbacks too
console.log("waiting for messages");
await c.consume({
callback: (m) => {
console.log(m.seq);
m.ack();
},
});
53 changes: 53 additions & 0 deletions examples/jetstream/simplified/06_heartbeats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";
import { ConsumerEvents } from "../../../nats-base-client/types.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc);

// retrieve an existing consumer
const js = nc.jetstream();

const c = await js.consumers.get(stream, consumer);
while (true) {
const messages = await c.consume({ max_messages: 1 });

// watch the to see if the consume operation misses heartbeats
(async () => {
for await (const s of await messages.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
// you can decide how many heartbeats you are willing to miss
const n = s.data as number;
console.log(`${n} heartbeats missed`);
if (n === 2) {
// by calling `stop()` the message processing loop ends
// in this case this is wrapped by a loop, so it attempts
// to re-setup the consume
messages.stop();
}
}
}
})();
for await (const m of messages) {
console.log(`${m.seq} ${m?.subject}`);
m.ack();
}
}
55 changes: 55 additions & 0 deletions examples/jetstream/simplified/07_consume_jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect, JsMsg } from "../../../src/mod.ts";
import { newTestStream, RateLimiter } from "./util.ts";
import { delay } from "../../../nats-base-client/util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc, 100);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that is monitors a rate limiter
// this effectively allows it to process 5 messages as fast
// as it can but stalls the consume if no resources are available
const messages = await c.consume({ max_messages: 10 });

// this rate limiter is just example code, do not use in production
const rl = new RateLimiter(5);

async function schedule(m: JsMsg): Promise<void> {
// pretend to do work
await delay(1000);
m.ack();
console.log(`${m.seq}`);
}

for await (const m of messages) {
await rl.lock();
schedule(m)
.catch((err) => {
console.log(`failed processing: ${err.message}`);
m.nak();
})
.finally(() => {
rl.unlock();
});
}
59 changes: 59 additions & 0 deletions examples/jetstream/simplified/08_consume_process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect } from "../../../src/mod.ts";
import { newTestStream } from "./util.ts";

// create a connection
const nc = await connect();

// make a stream and fill with messages, and create a consumer
const { stream, consumer } = await newTestStream(nc, 10000, true);

// retrieve an existing consumer
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that processes the stream
// creating a frequency table based on the subjects found
const messages = await c.consume();

const data = new Map<string, number>();
for await (const m of messages) {
const chunks = m.subject.split(".");
const v = data.get(chunks[1]) || 0;
data.set(chunks[1], v + 1);
m.ack();

// if no pending, then we have processed the stream
// and we can break
if (m.info.pending === 0) {
break;
}
}

// we can safely delete the consumer
await c.delete();

const keys = [];
for (const k of data.keys()) {
keys.push(k);
}
keys.sort();
keys.forEach((k) => {
console.log(`${k}: ${data.get(k)}`);
});

await nc.close();
Loading

0 comments on commit 9276c1e

Please sign in to comment.