Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Stream catchup would not sync after server crash and restart. #5362

Merged
merged 8 commits into from
Apr 27, 2024
11 changes: 5 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,10 +2831,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}
}
}
// If nothing left set to current delivered.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}
}
// We do these regardless.
delete(o.rdc, sseq)
Expand Down Expand Up @@ -3793,9 +3789,12 @@ func (o *consumer) checkAckFloor() {
}

// If we are here, and this should be rare, we still are off with our ack floor.
// We will make sure we are not doing un-necessary work here if only off by a bit
// since this could be normal for a high activity wq or stream.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand All @@ -3807,7 +3806,7 @@ func (o *consumer) checkAckFloor() {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
psseq = ss.FirstSeq - 1
}
} else {
// Since this was set via the pending, we should not include
Expand Down
4 changes: 4 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7021,6 +7021,10 @@ SKIP:
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
Expand Down
74 changes: 55 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type jetStreamCluster struct {
// concurrent requests that would otherwise be accepted.
// We also record the group for the stream. This is needed since if we have
// concurrent requests for same account and stream we need to let it process to get
// a response but they need to be same group, peers etc.
inflight map[string]map[string]*raftGroup
// a response but they need to be same group, peers etc. and sync subjects.
inflight map[string]map[string]*inflightInfo
// Signals meta-leader should check the stream assignments.
streamsCheck bool
// Server.
Expand All @@ -70,6 +70,12 @@ type jetStreamCluster struct {
qch chan struct{}
}

// Used to track inflight stream add requests to properly re-use same group and sync subject.
type inflightInfo struct {
rg *raftGroup
sync string
}

// Used to guide placement of streams and meta controllers in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster,omitempty"`
Expand Down Expand Up @@ -1576,6 +1582,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
}
}
}

// Now walk the ones to check and process consumers.
var caAdd, caDel []*consumerAssignment
for _, sa := range saChk {
Expand Down Expand Up @@ -2356,11 +2363,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
defer stopDirectMonitoring()

// For checking interest state if applicable.
var cist *time.Ticker
var cistc <-chan time.Time

if mset != nil && mset.isInterestRetention() {
// Wait on our consumers to be assigned and running before proceeding.
// This can become important when a server has lots of assets
// since we process streams first then consumers as an asset class.
mset.waitOnConsumerAssignments()
// Setup a periodic check here.
cist = time.NewTicker(30 * time.Second)
cistc = cist.C
}

// This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup,
Expand Down Expand Up @@ -2484,6 +2498,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
}

case <-cistc:
mset.checkInterestState()

case <-datc:
if mset == nil || isRecovering {
continue
Expand Down Expand Up @@ -2645,6 +2662,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
uch = mset.updateC()
// Also update our mqch
mqch = mset.monitorQuitC()
// Setup a periodic check here if we are interest based as well.
if mset.isInterestRetention() {
cist = time.NewTicker(30 * time.Second)
cistc = cist.C
}
}
}
if err != nil {
Expand Down Expand Up @@ -5495,7 +5517,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
for acc, asa := range cc.streams {
for _, sa := range asa {
if sa.Sync == _EMPTY_ {
s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name)
s.Warnf("Stream assignment corrupt for stream '%s > %s'", acc, sa.Config.Name)
nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client}
nsa.Sync = syncSubjForStream()
cc.meta.Propose(encodeUpdateStreamAssignment(nsa))
Expand Down Expand Up @@ -5999,6 +6021,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,

var self *streamAssignment
var rg *raftGroup
var syncSubject string

// Capture if we have existing assignment first.
if osa := js.streamAssignment(acc.Name, cfg.Name); osa != nil {
Expand All @@ -6008,7 +6031,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}
// This is an equal assignment.
self, rg = osa, osa.Group
self, rg, syncSubject = osa, osa.Group, osa.Sync
}

if cfg.Sealed {
Expand All @@ -6032,19 +6055,22 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}

// Make sure inflight is setup properly.
if cc.inflight == nil {
cc.inflight = make(map[string]map[string]*inflightInfo)
}
streams, ok := cc.inflight[acc.Name]
if !ok {
streams = make(map[string]*inflightInfo)
cc.inflight[acc.Name] = streams
}

// Raft group selection and placement.
if rg == nil {
// Check inflight before proposing in case we have an existing inflight proposal.
if cc.inflight == nil {
cc.inflight = make(map[string]map[string]*raftGroup)
}
streams, ok := cc.inflight[acc.Name]
if !ok {
streams = make(map[string]*raftGroup)
cc.inflight[acc.Name] = streams
} else if existing, ok := streams[cfg.Name]; ok {
// We have existing for same stream. Re-use same group.
rg = existing
if existing, ok := streams[cfg.Name]; ok {
// We have existing for same stream. Re-use same group and syncSubject.
rg, syncSubject = existing.rg, existing.sync
}
}
// Create a new one here if needed.
Expand All @@ -6060,14 +6086,17 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
rg.setPreferred()
}

if syncSubject == _EMPTY_ {
syncSubject = syncSubjForStream()
}
// Sync subject for post snapshot sync.
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
sa := &streamAssignment{Group: rg, Sync: syncSubject, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil {
// On success, add this as an inflight proposal so we can apply limits
// on concurrent create requests while this stream assignment has
// possibly not been processed yet.
if streams, ok := cc.inflight[acc.Name]; ok {
streams[cfg.Name] = rg
streams[cfg.Name] = &inflightInfo{rg, syncSubject}
}
}
}
Expand Down Expand Up @@ -8046,14 +8075,15 @@ func (mset *stream) isCurrent() bool {
}

// Maximum requests for the whole server that can be in flight at the same time.
const maxConcurrentSyncRequests = 16
const maxConcurrentSyncRequests = 32

var (
errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected")
errCatchupStalled = errors.New("catchup stalled")
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

// Process a stream snapshot.
Expand Down Expand Up @@ -8118,6 +8148,13 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
o.mu.Unlock()
}
mset.mu.Unlock()

// If we are interest based make sure to check our ack floor state.
// We will delay a bit to allow consumer states to also catchup.
if mset.isInterestRetention() {
fire := time.Duration(rand.Intn(int(10*time.Second))) + 5*time.Second
time.AfterFunc(fire, mset.checkInterestState)
}
}()

var releaseSem bool
Expand Down Expand Up @@ -8162,7 +8199,7 @@ RETRY:
numRetries++
if numRetries >= maxRetries {
// Force a hard reset here.
return errFirstSequenceMismatch
return errCatchupTooManyRetries
}

// Block here if we have too many requests in flight.
Expand Down Expand Up @@ -8239,7 +8276,6 @@ RETRY:
// Check for eof signaling.
if len(msg) == 0 {
msgsQ.recycle(&mrecs)
mset.checkInterestState()
return nil
}
if _, err := mset.processCatchupMsg(msg); err == nil {
Expand Down
42 changes: 42 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,48 @@ func TestJetStreamClusterParallelStreamCreation(t *testing.T) {
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// We had a bug during parallel stream creation as well that would overwrite the sync subject used for catchups, etc.
// Test that here as well by shutting down a non-leader, adding a whole bunch of messages, and making sure on restart
// we properly recover.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
nl.Shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

msg := bytes.Repeat([]byte("Z"), 128)
for i := 0; i < 100; i++ {
js.PublishAsync("common.foo.bar", msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// We need to force the leader to do a snapshot so we kick in upper layer catchup which depends on syncSubject.
sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())

nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

mset, err = nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

// Check state directly.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.mu.Unlock()

require_Equal(t, state.Msgs, 100)
require_Equal(t, state.FirstSeq, 1)
require_Equal(t, state.LastSeq, 100)
}

// In addition to test above, if streams were attempted to be created in parallel
Expand Down
6 changes: 4 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19512,7 +19512,7 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) {
})
require_NoError(t, err)

sub, err := js.SubscribeSync("", nats.Bind("name", durable))
sub, err := js.SubscribeSync(_EMPTY_, nats.Bind("name", durable))
require_NoError(t, err)

msg, err := sub.NextMsg(time.Millisecond * 500)
Expand All @@ -19537,7 +19537,9 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) {
require_NoError(t, err)

require_True(t, info.NumAckPending == 0)
require_True(t, info.AckFloor.Stream == 8)
// Should be 6 since we do not pull "other". We used to jump ack floor ahead
// but no longer do that.
require_True(t, info.AckFloor.Stream == 6)
require_True(t, info.AckFloor.Consumer == 1)
require_True(t, info.NumPending == 0)
}
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool {
var errFirstSequenceMismatch = errors.New("first sequence mismatch")

func isClusterResetErr(err error) bool {
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries
}

// Copy all fields.
Expand Down