From 62e432c56cef8c20bb673915ff05f2837d33ee9b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 09:31:29 -0700 Subject: [PATCH 1/8] Fix for skew in sync subjects during parallel stream creation. We had a bug that would overwrite the sync subject during parallel stream creation which would cause upper layer stream cacthups to fail on server restarts. We also were reporting first sequence mismatch when we hot max retries to force a reset but this was misleading, so added in proper error for max retires limit. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 49 +++++++++++++++++++----------- server/jetstream_cluster_3_test.go | 42 +++++++++++++++++++++++++ server/store.go | 2 +- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 770b6a24aef..31d0c32c895 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -47,8 +47,8 @@ type jetStreamCluster struct { // concurrent requests that would otherwise be accepted. // We also record the group for the stream. This is needed since if we have // concurrent requests for same account and stream we need to let it process to get - // a response but they need to be same group, peers etc. - inflight map[string]map[string]*raftGroup + // a response but they need to be same group, peers etc. and sync subjects. + inflight map[string]map[string]*inflightInfo // Signals meta-leader should check the stream assignments. streamsCheck bool // Server. @@ -70,6 +70,12 @@ type jetStreamCluster struct { qch chan struct{} } +// Used to track inflight stream add requests to properly re-use same group and sync subject. +type inflightInfo struct { + rg *raftGroup + sync string +} + // Used to guide placement of streams and meta controllers in clustered JetStream. type Placement struct { Cluster string `json:"cluster,omitempty"` @@ -1576,6 +1582,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove } } } + // Now walk the ones to check and process consumers. var caAdd, caDel []*consumerAssignment for _, sa := range saChk { @@ -5495,7 +5502,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) { for acc, asa := range cc.streams { for _, sa := range asa { if sa.Sync == _EMPTY_ { - s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) + s.Warnf("Stream assignment corrupt for stream '%s > %s'", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} nsa.Sync = syncSubjForStream() cc.meta.Propose(encodeUpdateStreamAssignment(nsa)) @@ -5999,6 +6006,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, var self *streamAssignment var rg *raftGroup + var syncSubject string // Capture if we have existing assignment first. if osa := js.streamAssignment(acc.Name, cfg.Name); osa != nil { @@ -6008,7 +6016,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, return } // This is an equal assignment. - self, rg = osa, osa.Group + self, rg, syncSubject = osa, osa.Group, osa.Sync } if cfg.Sealed { @@ -6032,19 +6040,22 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, return } + // Make sure inflight is setup properly. + if cc.inflight == nil { + cc.inflight = make(map[string]map[string]*inflightInfo) + } + streams, ok := cc.inflight[acc.Name] + if !ok { + streams = make(map[string]*inflightInfo) + cc.inflight[acc.Name] = streams + } + // Raft group selection and placement. if rg == nil { // Check inflight before proposing in case we have an existing inflight proposal. - if cc.inflight == nil { - cc.inflight = make(map[string]map[string]*raftGroup) - } - streams, ok := cc.inflight[acc.Name] - if !ok { - streams = make(map[string]*raftGroup) - cc.inflight[acc.Name] = streams - } else if existing, ok := streams[cfg.Name]; ok { - // We have existing for same stream. Re-use same group. - rg = existing + if existing, ok := streams[cfg.Name]; ok { + // We have existing for same stream. Re-use same group and syncSubject. + rg, syncSubject = existing.rg, existing.sync } } // Create a new one here if needed. @@ -6060,14 +6071,17 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, rg.setPreferred() } + if syncSubject == _EMPTY_ { + syncSubject = syncSubjForStream() + } // Sync subject for post snapshot sync. - sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} + sa := &streamAssignment{Group: rg, Sync: syncSubject, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil { // On success, add this as an inflight proposal so we can apply limits // on concurrent create requests while this stream assignment has // possibly not been processed yet. if streams, ok := cc.inflight[acc.Name]; ok { - streams[cfg.Name] = rg + streams[cfg.Name] = &inflightInfo{rg, syncSubject} } } } @@ -8054,6 +8068,7 @@ var ( errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away. errCatchupBadMsg = errors.New("bad catchup msg") errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg") + errCatchupTooManyRetries = errors.New("catchup failed, too many retries") ) // Process a stream snapshot. @@ -8162,7 +8177,7 @@ RETRY: numRetries++ if numRetries >= maxRetries { // Force a hard reset here. - return errFirstSequenceMismatch + return errCatchupTooManyRetries } // Block here if we have too many requests in flight. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 2e5e5337386..1e141ad23b4 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1417,6 +1417,48 @@ func TestJetStreamClusterParallelStreamCreation(t *testing.T) { if len(errCh) > 0 { t.Fatalf("Expected no errors, got %d", len(errCh)) } + + // We had a bug during parallel stream creation as well that would overwrite the sync subject used for catchups, etc. + // Test that here as well by shutting down a non-leader, adding a whole bunch of messages, and making sure on restart + // we properly recover. + nl := c.randomNonStreamLeader(globalAccountName, "TEST") + nl.Shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + msg := bytes.Repeat([]byte("Z"), 128) + for i := 0; i < 100; i++ { + js.PublishAsync("common.foo.bar", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + // We need to force the leader to do a snapshot so we kick in upper layer catchup which depends on syncSubject. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + node := mset.raftNode() + require_NotNil(t, node) + node.InstallSnapshot(mset.stateSnapshot()) + + nl = c.restartServer(nl) + c.waitOnServerCurrent(nl) + + mset, err = nl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + // Check state directly. + mset.mu.Lock() + var state StreamState + mset.store.FastState(&state) + mset.mu.Unlock() + + require_Equal(t, state.Msgs, 100) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 100) } // In addition to test above, if streams were attempted to be created in parallel diff --git a/server/store.go b/server/store.go index 515b305871c..6948d022e89 100644 --- a/server/store.go +++ b/server/store.go @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool { var errFirstSequenceMismatch = errors.New("first sequence mismatch") func isClusterResetErr(err error) bool { - return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch + return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries } // Copy all fields. From f396d6d6ba183439e04a99e82b551a313fdd17cd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 10:07:34 -0700 Subject: [PATCH 2/8] Delay checking interest state after processing a snapshot/catchup. Consumers could still be catching up as well. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 31d0c32c895..cf96b552165 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8133,6 +8133,13 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { o.mu.Unlock() } mset.mu.Unlock() + + // If we are interest based make sure to check our ack floor state. + // We will delay a bit to allow consumer states to also catchup. + if mset.isInterestRetention() { + fire := time.Duration(rand.Intn(int(10*time.Second))) + 5*time.Second + time.AfterFunc(fire, mset.checkInterestState) + } }() var releaseSem bool @@ -8254,7 +8261,6 @@ RETRY: // Check for eof signaling. if len(msg) == 0 { msgsQ.recycle(&mrecs) - mset.checkInterestState() return nil } if _, err := mset.processCatchupMsg(msg); err == nil { From b5738cb2b81c1c0317d4e1f5f4cbe550066265b4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 10:08:18 -0700 Subject: [PATCH 3/8] Increase allowed concurrent catchup requests. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cf96b552165..8c60849898e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8060,7 +8060,7 @@ func (mset *stream) isCurrent() bool { } // Maximum requests for the whole server that can be in flight at the same time. -const maxConcurrentSyncRequests = 16 +const maxConcurrentSyncRequests = 32 var ( errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected") From ece6b10b259cb134655fa81f33ed15823b223a2c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 11:13:44 -0700 Subject: [PATCH 4/8] On ack fixup do not jump delivered based on stream, and only double check if beyond a minumum threshold. On an active stream the ack floor periodic checks could trigger just due to normal circumstances, so use minimum threshold. Also do not jump delivered in that logic based on stream sequences. And finally do not have leader jump ack floors when pending is empty, this allows consistency checks to be consistent across all replicas. Signed-off-by: Derek Collison --- server/consumer.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 4b04caf5c5f..c7443b3214a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2831,10 +2831,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { } } } - // If nothing left set to current delivered. - if len(o.pending) == 0 { - o.adflr, o.asflr = o.dseq-1, o.sseq-1 - } } // We do these regardless. delete(o.rdc, sseq) @@ -3793,9 +3789,12 @@ func (o *consumer) checkAckFloor() { } // If we are here, and this should be rare, we still are off with our ack floor. + // We will make sure we are not doing un-necessary work here if only off by a bit + // since this could be normal for a high activity wq or stream. // We will set it explicitly to 1 behind our current lowest in pending, or if // pending is empty, to our current delivered -1. - if o.asflr < ss.FirstSeq-1 { + const minOffThreshold = 50 + if o.asflr < ss.FirstSeq-minOffThreshold { var psseq, pdseq uint64 for seq, p := range o.pending { if psseq == 0 || seq < psseq { @@ -3807,7 +3806,7 @@ func (o *consumer) checkAckFloor() { psseq, pdseq = o.sseq-1, o.dseq-1 // If still not adjusted. if psseq < ss.FirstSeq-1 { - psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1 + psseq = ss.FirstSeq - 1 } } else { // Since this was set via the pending, we should not include From c9585160b692466968baa60fc38fea11b135eb95 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 12:07:36 -0700 Subject: [PATCH 5/8] If empty at end of Compact() set first appropriately Signed-off-by: Derek Collison --- server/filestore.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index 51ad3a442cd..3cc601d2288 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7021,6 +7021,10 @@ SKIP: purged = fs.state.Msgs } fs.state.Msgs -= purged + if fs.state.Msgs == 0 { + fs.state.FirstSeq = fs.state.LastSeq + 1 + fs.state.FirstTime = time.Time{} + } if bytes > fs.state.Bytes { bytes = fs.state.Bytes From 0488f6da9f44e1c6e3cbe9342d53f007ac0e293c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 12:08:00 -0700 Subject: [PATCH 6/8] Periodically check interest state on interest based streams Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8c60849898e..d0adca501ee 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2363,11 +2363,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } defer stopDirectMonitoring() + // For checking interest state if applicable. + var cist *time.Ticker + var cistc <-chan time.Time + if mset != nil && mset.isInterestRetention() { // Wait on our consumers to be assigned and running before proceeding. // This can become important when a server has lots of assets // since we process streams first then consumers as an asset class. mset.waitOnConsumerAssignments() + // Setup a periodic check here. + cist = time.NewTicker(30 * time.Second) + cistc = cist.C } // This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup, @@ -2491,6 +2498,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } } + case <-cistc: + mset.checkInterestState() + case <-datc: if mset == nil || isRecovering { continue @@ -2652,6 +2662,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps uch = mset.updateC() // Also update our mqch mqch = mset.monitorQuitC() + // Setup a periodic check here if we are interest based as well. + if mset.isInterestRetention() { + cist = time.NewTicker(30 * time.Second) + cistc = cist.C + } } } if err != nil { From 52f0794b0fb585155a5f89e90a647116550e7e4e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 12:36:35 -0700 Subject: [PATCH 7/8] Fix test since we do not jump ack floors now Signed-off-by: Derek Collison --- server/jetstream_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 3944d23b252..50aa1d74b7e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19512,7 +19512,7 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) { }) require_NoError(t, err) - sub, err := js.SubscribeSync("", nats.Bind("name", durable)) + sub, err := js.SubscribeSync(_EMPTY_, nats.Bind("name", durable)) require_NoError(t, err) msg, err := sub.NextMsg(time.Millisecond * 500) @@ -19537,7 +19537,9 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) { require_NoError(t, err) require_True(t, info.NumAckPending == 0) - require_True(t, info.AckFloor.Stream == 8) + // Should be 6 since we do not pull "other". We used to jump ack floor ahead + // but no longer do that. + require_True(t, info.AckFloor.Stream == 6) require_True(t, info.AckFloor.Consumer == 1) require_True(t, info.NumPending == 0) } From a57d5e0626d53c9d196ae204eeeef2601a593fdd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 27 Apr 2024 13:19:52 -0700 Subject: [PATCH 8/8] Do jump consumer ack floor but not stream Signed-off-by: Derek Collison --- server/consumer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index c7443b3214a..8594a98cfa8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2831,6 +2831,11 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { } } } + // If nothing left set consumer to current delivered. + // Do not update stream. + if len(o.pending) == 0 { + o.adflr = o.dseq - 1 + } } // We do these regardless. delete(o.rdc, sseq)