-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
(2.12) Initial atomic batch publish #6966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't played with the branch yet but a few comments from a glance.
|
||
mset.mu.Lock() | ||
if mset.batches == nil { | ||
mset.batches = &batching{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably we don't need to allocate this if batching is disabled on the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we only initialize and allocate if a message comes in with the Nats-Batch-Id
header, and atomic publish is enabled.
That does remind me to add the following to the current set of limitations listed in the description to be picked up:
Support enabling(/disabling) of
AllowAtomicPublish
. And tear down resources when/if disabled.
mset.mu.Lock() | ||
if mset.batches == nil { | ||
mset.batches = &batching{ | ||
group: make(map[string]*batchGroup, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we should be making sure that this map can't grow infinitely, especially because maps never shrink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, the ADR mentions the following initial set of limits:
The server will operate under limits to safeguard itself:
- Each stream can only have 50 batches in flight at any time
- Each server can only have 1000 batches in flight at any time
- A batch that has not had traffic for 10 seconds will be abandoned
- Each batch can have maximum 1000 messages
Have mentioned this in the current set of limitations as well, and is one of the things to be added assuming we agree on the API/ADR:
There are no limits yet to the amount of batches, how large they are, how long the batch ID can be, when they time out and are abandoned. Timing out of batches is also not implemented yet.
d2b3e2c
to
e084b8a
Compare
I dont think this is right, if you accept these headers and check them against the pre-batch state before committing the batch and while the lock is held it all makes perfect sense. I might want to put messages into a batch for many subjects and I want to be sure that none of those subjects had updates while the batch were moving from client to servers, then it makes sense to express these limits and to verify them pre-batch. The batch will always be consistent so they dont need to reference into the batch, but they totally can reference pre-batch state and that makes these headers do the right thing. Restricting the headers to first message only doesn't make sense since the batch can have many subjects, first one isnt useful. |
This is where the problem lies. We can't deny the batch up front, even with locks held. Because all the replicas do the last sequence and last msg ID checks on their own. So with the current code we simply can't deny it up front from the leader.
Headers are NOT restricted to the first message only. I meant that the last sequence and last msg ID checks only really work if set on the first message.
last msg ID doesn't make sense to me for anything but the first message, because if you use last msg ID on all of them, you will error if one of them was a duplicate, so one msg ID didn't need to be applied. The current PR has no opinions about these specific headers, to be clear. |
OK, we definitely need to think about these, agree for now we can reject them but I think we will need some thing here as atomic publishes seem quite likely to want to express consistency
Indeed, hence me saying the state as it was before the batch land, conceptually that makes complete sense but I hear you about the implementation issues |
e084b8a
to
20b4c38
Compare
|
@MauriceVanVeen why doesn't just the leader do this?
Indeed, this is important for the event sourcing use case, specifically,
In this case, if there was a batch, let's say 3 messages with For the case of a batch where the So the checks are relative to the unique subjects in the batch (whether For |
@bruth, to clarify because there are many moving pieces to the whole batching story. (And I'm trying to keep this PR as small and as reviewable as possible as well) The
Correct, it's specifically these two headers that the leader does not check, and all replicas check on their own. Why the leader doesn't do this is twofold. One is simply for historic reasons I think, it was never necessary, but for batching it will be important to have the leader do this check and subsequently fail the batch if there's a mismatch. But maybe I'm just overly protective 😅, i.e. we couldn't guarantee this before in previous server versions but now with all the Raft fixes we can. And I'll just need to go through the code again and confirm if the leader says "YES" to accept and propose the message, that the replica will also always say "YES" to applying the message. I'll adjust the PR description to reflect this is a current limitation, and we shouldn't reject those headers but work on a fix for a subsequent PR. 👍 Side-noteSide-note for the
There are currently also some limitations that blocks any changes to a message when I believe the fix that's required for this will be very similar to the one that's needed for TL;DR
|
Have created sub-issues for the current set of limitations, so they are already tracked. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work in general here!
A few questions sprinkled throughout..
// checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers. | ||
// mset.mu lock must NOT be held or used. | ||
// mset.clMu lock must be held. | ||
func checkMsgHeadersPreClusteredProposal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it might better be named prepareBatch() which checks conditions once a client has sent the COMMIT instruction. This should work for R1 or RN and is checked once the batch has been staged. Once this passes then we can submit to NRG for R>1 and simply lock and call processStreamMsg() if R1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not solely for batching, it's also used for normal clustered proposals. That logic was extracted into this function.
During normal proposals we simply call this method once for the to-be-proposed message.
During batching we stage the batch of N messages, once we get the COMMIT we loop over the staged messages and call this N times in-order to do the consistency checks, and then we either propose (or not) the whole batch.
resp.Error = NewJSStreamNotMatchError() | ||
b, _ := json.Marshal(resp) | ||
outq.sendMsg(reply, b) | ||
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to imply that every message has a reply. Not sure that should be the case, and that we should only respond to the last message in a batch. So possibly we should destage the messages on an interior error, but store the error and simply ignore all other messages until the last commit one, which we respond to with the captured error, which is the first error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we are going for here is that if you send a batch over many many messages, you might want to flow control or just know its even getting to the stream.
So you could add a reply when you want some feedback and control over the flow, but keep the replies off to have minimal impact for most of the messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this makes sense - I understand what you are saying, just trying to think through this realistically. If we bound the upper limit to batches in terms of messages and/or bytes. If I want to go fast I do not want flow control per se but do want small batches that most of the time can be committed without anything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above means you can do that if you wish?
We put some limits in - but quite big ones like 1000 messages per batch - so in that case it feels like it would be needed. But maybe that limit is just too hight?
Tbh we had no idea what sane limits would be there so just picked numbers to ensure the code paths are there to check for limits, what those limits would be I dont think we know yet - but 1k sounds too high, but even with 100 on a slowish link I probably want to know the server is getting them at like 10, 20, 30 etc marks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think its too high if messages small, hence we probably need both limits, and allow them to be set on a per stream basis?
The client is backed by TCP/IP so flow control in place to ingest server (not ultimate destination), but if something at that point was missed we can capture that in the commit error back to the client and they can repeat. The app code needs to be able to do this anyway, although we could possibly do a retry inside the client lib if we know the error is transient, like missing sequence number etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be more important for batch to know if we should retry or not on error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ordering, if we think this is a common use case, client can still generate monotonic batch sequences, and once we process all the messages we could re-order? Not saying we should, just trying to look at common use cases and make sure we think we can work on those..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does that ordering number means per se? I am assuming it will be the order of the message in a given batch, correct?
What I would not like is to have to have a single-writer clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ordering, if we think this is a common use case, client can still generate monotonic batch sequences, and once we process all the messages we could re-order? Not saying we should, just trying to look at common use cases and make sure we think we can work on those..
one for @MauriceVanVeen, in theory seems fine but how the data gets staged might make it hard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the use case of re-ordering the message after sending, versus prior to sending?
Or rather, why would you not mind sending the messages out-of-order, but once stored you do care about ordering?
From the server-perspective I'd highly prefer things to be published/received in the order as they need to be persisted (if the checks pass). That could also allow optimizing certain parts in-memory prior to commit, versus needing to loop over everything and sort.
In my mind, either you care about ordering and you send them in the order you want them to be stored in.
Or you don't care about ordering, and you simply send them in whichever order. As long as the batch sequence monotonically increases, which the client can simply do for the user. (Depending on client API design, the user doesn't even need to see the batch sequence being used)
// lower layers. But we still need to pull out the msgId. | ||
if msgId = getMsgId(hdr); msgId != _EMPTY_ { | ||
// Do real check only if not clustered or traceOnly flag is set. | ||
if !isClustered || traceOnly { | ||
if dde := mset.checkMsgId(msgId); dde != nil { | ||
mset.ddMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since new lock vs mset mu, should check acquire the lock instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If mset.checkMsgId
would lock and unlock, it would return a pointer to ddentry
and we could then read without it being locked. So need to hold the lock for as long as we're accessing that data.
It did point to one place where this was not properly done, which is now fixed:
var seq uint64
mset.ddMu.Lock()
dde := mset.checkMsgId(msgId)
if dde != nil {
seq = dde.seq
}
mset.ddMu.Unlock()
if seq > 0 {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at ddentry, could be copied by value I think vs returning pointer. Not sure that is worth it, just pointing it out since its a string and 2 8-byte numbers.
20b4c38
to
066c34a
Compare
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
066c34a
to
faef006
Compare
This PR implements an initial version for atomic/batch publishing, as described in this ADR.
In general:
TestJetStreamAtomicPublish
.AllowAtomicPublish
is added, support for batch headersNats-Batch-Id, Nats-Batch-Sequence, Nats-Batch-Commit
, feature versioning, and support for batching as per the ADR.mset.ddmap, mset.ddarr, mset.ddindex
) is now guarded by a separate lock (mset.ddMu
). This is required because we need to keep holdingmset.clMu
while accessing the dedupe state, and we can't usemset.mu
as that would be a lock order violation. (Usage added tolocksordering.txt
)Nats-Expected-*
andNats-Msg-Id
) prior to proposal have been moved tojetstream_batching.go
, such that they can be used to do checks for all messages in the batch prior to accepting it fully.Implementation:
StreamStore
, currentlymemstore
only.Nats-Batch-Commit
, we loop through all messages and do the required header checks prior to proposing.Atomic publish is not complete after this PR, importantly it has the following limitations that will need to be fixed in separate PRs:
Nats-Expected-Last-Sequence
andNats-Expected-Last-Msg-Id
header support #6975Nats-Expected-Last-Subject-Sequence
in the same batch #6976Relates to #6549
Signed-off-by: Maurice van Veen github@mauricevanveen.com