Replies: 5 comments
-
|
First of all NATS is a bit different than Kafka, including when it comes to Consumers (which are server side state full things and more like Kafka partition-less consumer groups (if such a thing existed). Also there are no "offsets" per se: there are sequence numbers and each sequence number is typically acknowledged independently from the others. So for example you can always delete an existing consumer and re-create it with the same name and starting at any sequence number and applications will transparently use the newly created consumer. That said there is now also the ability to "reset" an existing customer back to a specific point in the stream (equivalent to resetting an offset in Kafka): nats-io/nats-server#7489 |
Beta Was this translation helpful? Give feedback.
-
|
thanks, so i can send reset, does it possible to do that via nats.go package ? |
Beta Was this translation helpful? Give feedback.
-
|
nats.go/jetstream/jetstream.go Lines 216 to 224 in ba84578 |
Beta Was this translation helpful? Give feedback.
-
|
Thanks! Can you recommend me detect seq number for specific time, for example i want to reset -2h? |
Beta Was this translation helpful? Give feedback.
-
|
You can use the 'batched get' function in orbit (e.g. https://github.com/synadia-io/orbit.go/blob/main/jetstreamext/getbatch.go) to ask for a batch of size one and specifying a timestamp of now minus 2 hours in the options using Just using batched get could also be a way to do what you want. Use it as specified above but with a larger batch size, you will get all the messages (matching the filters passed to the batched get), and if there's more messages then send another batched get request but this time at 'last sequence number in the previous batch's results + 1'. Also if there's only one process actually consuming messages from the consumer, then instead of using a durable consumer that you need to reset, you could just use an 'ordered consumer' and when you need to replay, just create a new 'ordered consumer' starting at whatever point in time you want. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I have event processor and find some but in my code, so i need to reset offset for consumer and reprocess messages.
In kafka i can reset offset for consumer , how about jetstream ?
Beta Was this translation helpful? Give feedback.
All reactions