JetStreamActor (#3) ships with push-consumer support only — the consumer pump uses the kafkajs-style for await (const m of subscription) AsyncIterator and waits for an explicit ack / nak / term from the handler.
NATS JetStream also supports pull consumers, which fit some workloads better:
- The application explicitly fetches a batch of messages:
consumer.fetch({ batch: 10, expires: 5_000 }). Allows the actor system to apply its own backpressure (only fetch when the downstream is ready).
- Better fit for slow / bursty consumers — push delivery can overrun the actor's mailbox; pull lets the consumer self-pace.
Scope:
- New command
{ kind: 'fetch'; batch: number; expiresMs?: number } on JetStreamCmd.
- Internal pump alternates: when in pull mode, no auto-iterating subscription; explicit
consumer.fetch(...) per fetch cmd.
- ack/nak/term handshake unchanged.
- Stream + consumer creation paths same; the
consumer.create config gains a 'push' | 'pull' discriminator.
Out of scope: flow-control / token-bucket auto-fetching (separate issue if needed).
Estimate: 2-3 days.
Verification:
- Test that pulling N messages results in exactly N forwarded to target.
- Test that
expires triggers a clean empty-fetch path (not an error).
- Test that subsequent fetch resumes from the right offset on the durable consumer.
JetStreamActor (#3) ships with push-consumer support only — the consumer pump uses the kafkajs-style
for await (const m of subscription)AsyncIterator and waits for an explicitack/nak/termfrom the handler.NATS JetStream also supports pull consumers, which fit some workloads better:
consumer.fetch({ batch: 10, expires: 5_000 }). Allows the actor system to apply its own backpressure (only fetch when the downstream is ready).Scope:
{ kind: 'fetch'; batch: number; expiresMs?: number }onJetStreamCmd.consumer.fetch(...)perfetchcmd.consumer.createconfig gains a'push' | 'pull'discriminator.Out of scope: flow-control / token-bucket auto-fetching (separate issue if needed).
Estimate: 2-3 days.
Verification:
expirestriggers a clean empty-fetch path (not an error).