Skip to content

(2.12) Initial atomic batch publish #6966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

MauriceVanVeen
Copy link
Member

@MauriceVanVeen MauriceVanVeen commented Jun 11, 2025

This PR implements an initial version for atomic/batch publishing, as described in this ADR.

In general:

  • A client can now be implemented (given the current limitations listed below) to use the new batch API. For an example, see TestJetStreamAtomicPublish.
  • The stream config field for AllowAtomicPublish is added, support for batch headers Nats-Batch-Id, Nats-Batch-Sequence, Nats-Batch-Commit, feature versioning, and support for batching as per the ADR.
  • Dedupe state (mset.ddmap, mset.ddarr, mset.ddindex) is now guarded by a separate lock (mset.ddMu). This is required because we need to keep holding mset.clMu while accessing the dedupe state, and we can't use mset.mu as that would be a lock order violation. (Usage added to locksordering.txt)
  • All clustered header checks (like Nats-Expected-* and Nats-Msg-Id) prior to proposal have been moved to jetstream_batching.go, such that they can be used to do checks for all messages in the batch prior to accepting it fully.

Implementation:

  • Client publishes batches as per the ADR.
  • Server receives these, and:
    • Stores each message in the batch into a StreamStore, currently memstore only.
    • Once Nats-Batch-Commit, we loop through all messages and do the required header checks prior to proposing.
  • Server rejects the batch if gaps are detected, or any required header checks fail.
  • Otherwise, Server proposes append entries consisting of the batched entries.
  • Followers check they've received all proposed entries containing the batch, IFF the full batch is received they can simply apply all entries and don't need to do further header checks.

Atomic publish is not complete after this PR, importantly it has the following limitations that will need to be fixed in separate PRs:

Relates to #6549

Signed-off-by: Maurice van Veen github@mauricevanveen.com

@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner June 11, 2025 16:13
Copy link
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't played with the branch yet but a few comments from a glance.


mset.mu.Lock()
if mset.batches == nil {
mset.batches = &batching{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we don't need to allocate this if batching is disabled on the stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we only initialize and allocate if a message comes in with the Nats-Batch-Id header, and atomic publish is enabled.

That does remind me to add the following to the current set of limitations listed in the description to be picked up:

Support enabling(/disabling) of AllowAtomicPublish. And tear down resources when/if disabled.

mset.mu.Lock()
if mset.batches == nil {
mset.batches = &batching{
group: make(map[string]*batchGroup, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should be making sure that this map can't grow infinitely, especially because maps never shrink.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the ADR mentions the following initial set of limits:

The server will operate under limits to safeguard itself:

  • Each stream can only have 50 batches in flight at any time
  • Each server can only have 1000 batches in flight at any time
  • A batch that has not had traffic for 10 seconds will be abandoned
  • Each batch can have maximum 1000 messages

Have mentioned this in the current set of limitations as well, and is one of the things to be added assuming we agree on the API/ADR:

There are no limits yet to the amount of batches, how large they are, how long the batch ID can be, when they time out and are abandoned. Timing out of batches is also not implemented yet.

@MauriceVanVeen MauriceVanVeen force-pushed the maurice/init-atomic-batch branch from d2b3e2c to e084b8a Compare June 11, 2025 17:25
@MauriceVanVeen MauriceVanVeen changed the title (2.12) Initial atomic publish (2.12) Initial atomic batch publish Jun 11, 2025
@ripienaar
Copy link
Contributor

Headers like Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id should be rejected. For example, given these headers a batch would fail: if a batch starts and anything other than the first message contains these headers, or if between our batch start and commit a new publish or batch was admitted in. We also don't know with absolute certainty what the sequence or message ID of any proposed but not yet applied message is. Usage of these headers should be rejected so we can ensure batches don't need to be continuously retried.

I dont think this is right, if you accept these headers and check them against the pre-batch state before committing the batch and while the lock is held it all makes perfect sense.

I might want to put messages into a batch for many subjects and I want to be sure that none of those subjects had updates while the batch were moving from client to servers, then it makes sense to express these limits and to verify them pre-batch. The batch will always be consistent so they dont need to reference into the batch, but they totally can reference pre-batch state and that makes these headers do the right thing.

Restricting the headers to first message only doesn't make sense since the batch can have many subjects, first one isnt useful.

@MauriceVanVeen
Copy link
Member Author

if you accept these headers and check them against the pre-batch state before committing the batch and while the lock is held it all makes perfect sense.

This is where the problem lies. We can't deny the batch up front, even with locks held. Because all the replicas do the last sequence and last msg ID checks on their own. So with the current code we simply can't deny it up front from the leader.
Maybe we could make that work, but I highly doubt it because we can't guarantee what the last sequence is going to be until it is applied on a replica.
I don't have an answer for this issue yet.. but I'd rather have us initially reject the headers all together if we don't have a fix, instead of partially applying a batch (what would happen now).

Restricting the headers to first message only doesn't make sense since the batch can have many subjects, first one isnt useful.

Headers are NOT restricted to the first message only. I meant that the last sequence and last msg ID checks only really work if set on the first message.
For example, if you have a stream with 5 messages and you send the following batch:

  • msg1 no headers, msg2 expected last seq must be 5 => this batch will fail, because the last sequence after applying msg1 will be 6
  • msg1 expected last seq must be 5, msg2 expected last seq must be 5 => this batch will fail, because the last sequence is upped to 6, so the consistency check would fail on msg 2

last msg ID doesn't make sense to me for anything but the first message, because if you use last msg ID on all of them, you will error if one of them was a duplicate, so one msg ID didn't need to be applied.
last seq does work if used for more than only the first message, but only if it increases for every single message, and even then it would be tricky because if you use a msg ID for deduplication and the message is a duplicate, then the whole batch will fail as well.

The current PR has no opinions about these specific headers, to be clear.
But we need to do something with them. Either reject them or fix the problem. Rejecting is easy for 2.12, fixing the problem could be tricky..

@ripienaar
Copy link
Contributor

OK, we definitely need to think about these, agree for now we can reject them but I think we will need some thing here as atomic publishes seem quite likely to want to express consistency

  • msg1 no headers, msg2 expected last seq must be 5 => this batch will fail, because the last sequence after applying msg1 will be 6

Indeed, hence me saying the state as it was before the batch land, conceptually that makes complete sense but I hear you about the implementation issues

@MauriceVanVeen MauriceVanVeen force-pushed the maurice/init-atomic-batch branch from e084b8a to 20b4c38 Compare June 13, 2025 09:55
@ripienaar
Copy link
Contributor

natscli main can create and edit these streams and @ploubser did some extensive end to end testing and found it to be working well for R>1

@bruth
Copy link
Member

bruth commented Jun 14, 2025

Because all the replicas do the last sequence and last msg ID checks on their own.

@MauriceVanVeen why doesn't just the leader do this?

I think we will need some thing here as atomic publishes seem quite likely to want to express consistency

Indeed, this is important for the event sourcing use case, specifically, Nats-Expected-Last-Subject-Sequence and the fairly new Nats-Expected-Last-Subject-Sequence-Subject, e.g.

PUB orders.123.order-placed
Nats-Expected-Last-Subject-Sequence-Subject: orders.123.*
Nats-Expected-Last-Subject-Sequence: 4

In this case, if there was a batch, let's say 3 messages with Sequence-Subject of orders.123.*, beyond the first message, the sequence number check should not be required (although in this case, it would be monotonic by 1).

For the case of a batch where the Sequence-Subject is not specified, for each message with a different subject, the last sequence should be checked.

So the checks are relative to the unique subjects in the batch (whether Sequence-Subject or the default case of the message subject).

For Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id this a stream-level sequence check, so any concurrent commits would fail the batch.

@MauriceVanVeen
Copy link
Member Author

@bruth, to clarify because there are many moving pieces to the whole batching story. (And I'm trying to keep this PR as small and as reviewable as possible as well)

The Nats-Expected-Last-Subject-Sequence and Nats-Expected-Last-Subject-Sequence-Subject are already fully supported with just this PR. I believe that already covers the majority of the event sourcing use case you mentioned?
*There's one side-note I'll address at the bottom of this message, before the TL;DR.

For Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id this a stream-level sequence check, so any concurrent commits would fail the batch.
...
why doesn't just the leader do this?

Correct, it's specifically these two headers that the leader does not check, and all replicas check on their own. Why the leader doesn't do this is twofold. One is simply for historic reasons I think, it was never necessary, but for batching it will be important to have the leader do this check and subsequently fail the batch if there's a mismatch.
The other issue is that we can't 100% guarantee if the checks work correctly if the leader does do the check (can we really 100% be sure the last sequence/msgId work even with multiple inflight proposals, etc.).

But maybe I'm just overly protective 😅, i.e. we couldn't guarantee this before in previous server versions but now with all the Raft fixes we can. And I'll just need to go through the code again and confirm if the leader says "YES" to accept and propose the message, that the replica will also always say "YES" to applying the message.

I'll adjust the PR description to reflect this is a current limitation, and we shouldn't reject those headers but work on a fix for a subsequent PR. 👍
(assuming we can ensure the right guarantees, but I'm thinking it should be possible and just need to confirm)

Side-note

Side-note for the Nats-Expected-Last-Sequence and Nats-Expected-Last-Subject-Sequence headers. Like you specify here (if I understood you correctly):

beyond the first message, the sequence number check should not be required (although in this case, it would be monotonic by 1).

There are currently also some limitations that blocks any changes to a message when Nats-Expected-Last-Subject-Sequence is used. Once you publish one message under subject A, if you try to do another such consistency check while the first message is still actively being proposed and not applied yet, it will fail prompting you to retry.
This is what will currently prevent you from having a batch that uses the same Nats-Expected-Last-Subject-Sequence-Subject or message subject for multiple messages. This error was added because this was one reason how we could desync for the KV use case (or for event sourcing, etc.), so it can't simply be removed.

I believe the fix that's required for this will be very similar to the one that's needed for Nats-Expected-Last-Sequence to work and be checked by the leader. I'll work on a subsequent PR once that's figured out and works.
(This will probably also fix another issue where we could desync if you publish to subject A both with and without the expected last subject sequence check, have server restarts, catchups, and message deletions prior to catchup)

TL;DR

  • Trying to keep this PR contained. There's still loads of work to do, so want to make sure we agree on the API and commit to fixing the listed (or to be discovered) current limitations in separate PRs. To not let this PR explode in terms of content and discussion (and make all of them easier to review and merge).
  • Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id currently don't properly fail the batch if the check is invalid (because the replica, not the leader, does these checks). This will need to be fixed in a separate PR.
  • If a batch consists of multiple messages using the same Nats-Expected-Last-Subject-Sequence-Subject or message subject, then the batch will fail instead of pass, even if the consistency checks should be successful. This will also need to be fixed in a separate PR. Probably the fix will be very similar to the one needed to fix Nats-Expected-Last-Sequence usage, so that'll likely be the same PR instead of separate ones.

@MauriceVanVeen
Copy link
Member Author

Have created sub-issues for the current set of limitations, so they are already tracked.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work in general here!

A few questions sprinkled throughout..

// checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers.
// mset.mu lock must NOT be held or used.
// mset.clMu lock must be held.
func checkMsgHeadersPreClusteredProposal(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it might better be named prepareBatch() which checks conditions once a client has sent the COMMIT instruction. This should work for R1 or RN and is checked once the batch has been staged. Once this passes then we can submit to NRG for R>1 and simply lock and call processStreamMsg() if R1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not solely for batching, it's also used for normal clustered proposals. That logic was extracted into this function.

During normal proposals we simply call this method once for the to-be-proposed message.
During batching we stage the batch of N messages, once we get the COMMIT we loop over the staged messages and call this N times in-order to do the consistency checks, and then we either propose (or not) the whole batch.

resp.Error = NewJSStreamNotMatchError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to imply that every message has a reply. Not sure that should be the case, and that we should only respond to the last message in a batch. So possibly we should destage the messages on an interior error, but store the error and simply ignore all other messages until the last commit one, which we respond to with the captured error, which is the first error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we are going for here is that if you send a batch over many many messages, you might want to flow control or just know its even getting to the stream.

So you could add a reply when you want some feedback and control over the flow, but keep the replies off to have minimal impact for most of the messages

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this makes sense - I understand what you are saying, just trying to think through this realistically. If we bound the upper limit to batches in terms of messages and/or bytes. If I want to go fast I do not want flow control per se but do want small batches that most of the time can be committed without anything else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above means you can do that if you wish?

We put some limits in - but quite big ones like 1000 messages per batch - so in that case it feels like it would be needed. But maybe that limit is just too hight?

Tbh we had no idea what sane limits would be there so just picked numbers to ensure the code paths are there to check for limits, what those limits would be I dont think we know yet - but 1k sounds too high, but even with 100 on a slowish link I probably want to know the server is getting them at like 10, 20, 30 etc marks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its too high if messages small, hence we probably need both limits, and allow them to be set on a per stream basis?

The client is backed by TCP/IP so flow control in place to ingest server (not ultimate destination), but if something at that point was missed we can capture that in the commit error back to the client and they can repeat. The app code needs to be able to do this anyway, although we could possibly do a retry inside the client lib if we know the error is transient, like missing sequence number etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be more important for batch to know if we should retry or not on error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ordering, if we think this is a common use case, client can still generate monotonic batch sequences, and once we process all the messages we could re-order? Not saying we should, just trying to look at common use cases and make sure we think we can work on those..

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does that ordering number means per se? I am assuming it will be the order of the message in a given batch, correct?

What I would not like is to have to have a single-writer clients.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ordering, if we think this is a common use case, client can still generate monotonic batch sequences, and once we process all the messages we could re-order? Not saying we should, just trying to look at common use cases and make sure we think we can work on those..

one for @MauriceVanVeen, in theory seems fine but how the data gets staged might make it hard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the use case of re-ordering the message after sending, versus prior to sending?
Or rather, why would you not mind sending the messages out-of-order, but once stored you do care about ordering?

From the server-perspective I'd highly prefer things to be published/received in the order as they need to be persisted (if the checks pass). That could also allow optimizing certain parts in-memory prior to commit, versus needing to loop over everything and sort.

In my mind, either you care about ordering and you send them in the order you want them to be stored in.
Or you don't care about ordering, and you simply send them in whichever order. As long as the batch sequence monotonically increases, which the client can simply do for the user. (Depending on client API design, the user doesn't even need to see the batch sequence being used)

// lower layers. But we still need to pull out the msgId.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
// Do real check only if not clustered or traceOnly flag is set.
if !isClustered || traceOnly {
if dde := mset.checkMsgId(msgId); dde != nil {
mset.ddMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since new lock vs mset mu, should check acquire the lock instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mset.checkMsgId would lock and unlock, it would return a pointer to ddentry and we could then read without it being locked. So need to hold the lock for as long as we're accessing that data.

It did point to one place where this was not properly done, which is now fixed:

var seq uint64
mset.ddMu.Lock()
dde := mset.checkMsgId(msgId)
if dde != nil {
	seq = dde.seq
}
mset.ddMu.Unlock()
if seq > 0 {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at ddentry, could be copied by value I think vs returning pointer. Not sure that is worth it, just pointing it out since its a string and 2 8-byte numbers.

@MauriceVanVeen MauriceVanVeen force-pushed the maurice/init-atomic-batch branch from 20b4c38 to 066c34a Compare June 16, 2025 16:13
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/init-atomic-batch branch from 066c34a to faef006 Compare June 16, 2025 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants