Skip to content

Commit

Permalink
Merge branch 'dev' into streams
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 12, 2023
2 parents 2d50da9 + 5e71814 commit 0673309
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 24 deletions.
2 changes: 1 addition & 1 deletion examples/jetstream/06_heartbeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ while (true) {
const n = s.data as number;
console.log(`${n} heartbeats missed`);
if (n === 2) {
// by calling `stop()` the message processing loop ends
// by calling `stop()` the message processing loop ends.
// in this case this is wrapped by a loop, so it attempts
// to re-setup the consume
messages.stop();
Expand Down
7 changes: 3 additions & 4 deletions examples/jetstream/07_consume_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ const { stream, consumer } = await setupStreamAndConsumer(nc, 100);
const js = nc.jetstream();
const c = await js.consumers.get(stream, consumer);

// this example uses a consume that is monitors a rate limiter
// this effectively allows it to process 5 messages as fast
// as it can but stalls the consume if no resources are available
const messages = await c.consume({ max_messages: 10 });

// this rate limiter is just example code, do not use in production
// this example controls parallel processing of the messages
// by only allowing 5 concurrent messages to be processed
// and then only allowing additional processing as others complete
const rl = new SimpleMutex(5);

async function schedule(m: JsMsg): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ await jsm.streams.add({ name: "a", subjects: ["a.*"] });
// create a jetstream client:
const js = nc.jetstream();

// to publish messages to a stream:
// publish a message received by a stream
let pa = await js.publish("a.b");
// the jetstream returns an acknowledgement with the
// stream that captured the message, it's assigned sequence
Expand All @@ -20,8 +20,9 @@ console.log(
// More interesting is the ability to prevent duplicates
// on messages that are stored in the server. If
// you assign a message ID, the server will keep looking
// for the same ID for a configured amount of time, and
// reject messages that sport the same ID:
// for the same ID for a configured amount of time (within a
// configurable time window), and reject messages that
// have the same ID:
await js.publish("a.b", Empty, { msgID: "a" });

// you can also specify constraints that should be satisfied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ streams.forEach((si) => {
console.log(si);
});

// add a stream
// add a stream - jetstream can capture nats core messages
const stream = "mystream";
const subj = `mystream.*`;
await jsm.streams.add({ name: stream, subjects: [subj] });
Expand Down
29 changes: 14 additions & 15 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Messages are replayed from a stream by _consumers_. A consumer configuration
specifies which messages should be presented. For example a consumer may only be
interested in viewing messages from a specific sequence or starting from a
specific time, or having a specific subject. The configuration also specifies if
the server should require for a message to be acknowledged, and how long to wait
the server should require messages to be acknowledged and how long to wait
for acknowledgements. The consumer configuration also specifies options to
control the rate at which messages are presented to the client.

Expand Down Expand Up @@ -42,12 +42,11 @@ streams.forEach((si) => {
console.log(si);
});

// add a stream
// add a stream - jetstream can capture nats core messages
const stream = "mystream";
const subj = `mystream.*`;
await jsm.streams.add({ name: stream, subjects: [subj] });

// jetstream can capture nats core messages
for (let i = 0; i < 100; i++) {
nc.publish(`${subj}.a`, Empty);
}
Expand Down Expand Up @@ -107,7 +106,7 @@ await jsm.consumers.delete(stream, "me");

## JetStream Client

The JetStream client presents an API for working with messages stored on a
The JetStream client presents an API for working with messages stored in a
stream.

```typescript
Expand All @@ -118,7 +117,7 @@ await jsm.streams.add({ name: "a", subjects: ["a.*"] });
// create a jetstream client:
const js = nc.jetstream();

// to publish messages to a stream
// publish a message received by a stream
let pa = await js.publish("a.b");
// jetstream returns an acknowledgement with the
// stream that captured the message, it's assigned sequence
Expand All @@ -130,9 +129,9 @@ const duplicate = pa.duplicate;
// More interesting is the ability to prevent duplicates
// on messages that are stored in the server. If
// you assign a message ID, the server will keep looking
// for the same ID for a configured amount of time (specified
// by the stream configuration), and reject messages that
// sport the same ID:
// for the same ID for a configured amount of time (within a
// configurable time window), and reject messages that
// have the same ID:
await js.publish("a.b", Empty, { msgID: "a" });

// you can also specify constraints that should be satisfied.
Expand Down Expand Up @@ -251,7 +250,7 @@ const c = await js.consumers.get(stream, consumer);
const oc = await js.consumers.get(stream);
```

[full example](examples/jetstream/simplified/01_consumers.ts)
[full example](examples/jetstream/01_consumers.ts)

With the consumer in hand, the client can start reading messages using whatever
API is appropriate for the application.
Expand Down Expand Up @@ -300,7 +299,7 @@ if (m) {
}
```

[full example](examples/jetstream/simplified/02_next.ts)
[full example](examples/jetstream/02_next.ts)

The operation takes an optional argument. Currently, the only option is an
`expires` option which specifies the maximum number of milliseconds to wait for
Expand Down Expand Up @@ -335,7 +334,7 @@ for (let i = 0; i < 3; i++) {
}
```

[full example](examples/jetstream/simplified/03_batch.ts)
[full example](examples/03_batch.ts)

Fetching batches is useful if you parallelize a number of requests to take
advantage of the asynchronous processing of data with a number of workers for
Expand Down Expand Up @@ -364,7 +363,7 @@ for await (const m of messages) {
}
```

[full example](examples/jetstream/simplified/04_consume.ts)
[full example](examples/jetstream/04_consume.ts)

Note that it is possible to do an automatic version of `next()` by simply
setting the maximum number of messages to buffer to `1`:
Expand Down Expand Up @@ -493,7 +492,7 @@ for await (const m of messages) {
}
```

[full example](examples/jetstream/simplified/07_consume_jobs.ts)
[full example](examples/jetstream/07_consume_jobs.ts)

#### Processing a Stream

Expand Down Expand Up @@ -534,7 +533,7 @@ keys.forEach((k) => {
});
```

[full example](examples/jetstream/simplified/08_consume_process.ts)
[full example](examples/jetstream/08_consume_process.ts)

### Heartbeats

Expand Down Expand Up @@ -577,7 +576,7 @@ while (true) {
}
```

[full example](examples/jetstream/simplified/06_heartbeats.ts)
[full example](examples/jetstream/06_heartbeats.ts)

Note that while the heartbeat interval is configurable, you shouldn't change it.

Expand Down

0 comments on commit 0673309

Please sign in to comment.