Skip to content

Commit

Permalink
[FIX] better pull example that allows seeing redeliveries
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Apr 3, 2022
1 parent ac241ba commit 8bbf320
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
58 changes: 35 additions & 23 deletions examples/jetstream/07_pullsub.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,58 @@
import { AckPolicy, connect } from "../../src/mod.ts";
import { AckPolicy, connect, nanos } from "../../src/mod.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
import { delay } from "../../nats-base-client/util.ts";

const nc = await connect();

const stream = nuid.next();
const subj = nuid.next();
const durable = nuid.next();

const jsm = await nc.jetstreamManager();
await jsm.streams.add(
{ name: stream, subjects: [subj] },
);

await jsm.consumers.add(stream, {
durable_name: "c",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);

// this is similar to fetch, but the consumer is created
// behind the scenes. To pull messages, you call `pull()` on
// the PullSubscription.
const psub = await js.pullSubscribe(subj, {
config: { durable_name: "c" },
mack: true,
// artificially low ack_wait, to show some messages
// not getting acked being redelivered
config: {
durable_name: durable,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(4000),
},
});
const done = (async () => {

(async () => {
for await (const m of psub) {
console.log(`${m.info.stream}[${m.seq}]`);
m.ack();
console.log(
`[${m.seq}] ${
m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
}`,
);
if (m.seq % 2 === 0) {
m.ack();
}
}
})();
psub.unsubscribe(4);
psub.pull({ batch: 2 });

await delay(500);
console.log(`pull more`);
psub.pull({ batch: 2 });
await done;
console.log("done");
await nc.close();

const fn = () => {
console.log("[PULL]");
psub.pull({ batch: 1000, expires: 10000 });
};

// do the initial pull
fn();
// and now schedule a pull every so often
const interval = setInterval(fn, 10000); // and repeat every 2s

setTimeout(() => {
clearInterval(interval);
nc.drain();
}, 20000);
38 changes: 29 additions & 9 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,39 @@ ask for messages and only for those messages it can handle during its pull
interval.

```typescript
const psub = await js.pullSubscribe(subj, { config: { durable_name: "c" } });
const done = (async () => {
const psub = await js.pullSubscribe(subj, {
mack: true,
// artificially low ack_wait, to show some messages
// not getting acked being redelivered
config: {
durable_name: durable,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(4000),
},
});

(async () => {
for await (const m of psub) {
console.log(`${m.info.stream}[${m.seq}]`);
m.ack();
console.log(
`[${m.seq}] ${
m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
}`,
);
if (m.seq % 2 === 0) {
m.ack();
}
}
})();
psub.unsubscribe(4);

// To start receiving messages you pull the subscription
setInterval(() => {
psub.pull({ batch: 10, expires: 10000 });
}, 10000);
const fn = () => {
console.log("[PULL]");
psub.pull({ batch: 1000, expires: 10000 });
};

// do the initial pull
fn();
// and now schedule a pull every so often
const interval = setInterval(fn, 10000); // and repeat every 2s
```

Note the above example is contrived, as the pull interval is fixed based on some
Expand Down

0 comments on commit 8bbf320

Please sign in to comment.