Skip to content

Commit

Permalink
[FIXED] Stream catchup would not sync after server crash and restart. (
Browse files Browse the repository at this point in the history
…#5362)

We had a bug that would overwrite the sync subject during parallel
stream creation which would cause upper layer stream catchups to fail on
server crash and subsequent restarts.
    
We also were reporting first sequence mismatch when we hit max retries
to force a reset but this was misleading, so added in proper error for
max retires limit.

Also on some extreme kill and restart cases our internal checks were
being called before complete state was achieved, so delay the initial
check and added in periodic checks to ensure replica consistency.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 27, 2024
2 parents f678296 + a57d5e0 commit a72ea23
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 26 deletions.
12 changes: 8 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,9 +2831,10 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}
}
}
// If nothing left set to current delivered.
// If nothing left set consumer to current delivered.
// Do not update stream.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
o.adflr = o.dseq - 1
}
}
// We do these regardless.
Expand Down Expand Up @@ -3793,9 +3794,12 @@ func (o *consumer) checkAckFloor() {
}

// If we are here, and this should be rare, we still are off with our ack floor.
// We will make sure we are not doing un-necessary work here if only off by a bit
// since this could be normal for a high activity wq or stream.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand All @@ -3807,7 +3811,7 @@ func (o *consumer) checkAckFloor() {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
psseq = ss.FirstSeq - 1
}
} else {
// Since this was set via the pending, we should not include
Expand Down
4 changes: 4 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7021,6 +7021,10 @@ SKIP:
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
Expand Down
74 changes: 55 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type jetStreamCluster struct {
// concurrent requests that would otherwise be accepted.
// We also record the group for the stream. This is needed since if we have
// concurrent requests for same account and stream we need to let it process to get
// a response but they need to be same group, peers etc.
inflight map[string]map[string]*raftGroup
// a response but they need to be same group, peers etc. and sync subjects.
inflight map[string]map[string]*inflightInfo
// Signals meta-leader should check the stream assignments.
streamsCheck bool
// Server.
Expand All @@ -70,6 +70,12 @@ type jetStreamCluster struct {
qch chan struct{}
}

// Used to track inflight stream add requests to properly re-use same group and sync subject.
type inflightInfo struct {
rg *raftGroup
sync string
}

// Used to guide placement of streams and meta controllers in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster,omitempty"`
Expand Down Expand Up @@ -1576,6 +1582,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
}
}
}

// Now walk the ones to check and process consumers.
var caAdd, caDel []*consumerAssignment
for _, sa := range saChk {
Expand Down Expand Up @@ -2356,11 +2363,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
defer stopDirectMonitoring()

// For checking interest state if applicable.
var cist *time.Ticker
var cistc <-chan time.Time

if mset != nil && mset.isInterestRetention() {
// Wait on our consumers to be assigned and running before proceeding.
// This can become important when a server has lots of assets
// since we process streams first then consumers as an asset class.
mset.waitOnConsumerAssignments()
// Setup a periodic check here.
cist = time.NewTicker(30 * time.Second)
cistc = cist.C
}

// This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup,
Expand Down Expand Up @@ -2484,6 +2498,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
}

case <-cistc:
mset.checkInterestState()

case <-datc:
if mset == nil || isRecovering {
continue
Expand Down Expand Up @@ -2645,6 +2662,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
uch = mset.updateC()
// Also update our mqch
mqch = mset.monitorQuitC()
// Setup a periodic check here if we are interest based as well.
if mset.isInterestRetention() {
cist = time.NewTicker(30 * time.Second)
cistc = cist.C
}
}
}
if err != nil {
Expand Down Expand Up @@ -5495,7 +5517,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
for acc, asa := range cc.streams {
for _, sa := range asa {
if sa.Sync == _EMPTY_ {
s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name)
s.Warnf("Stream assignment corrupt for stream '%s > %s'", acc, sa.Config.Name)
nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client}
nsa.Sync = syncSubjForStream()
cc.meta.Propose(encodeUpdateStreamAssignment(nsa))
Expand Down Expand Up @@ -5999,6 +6021,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,

var self *streamAssignment
var rg *raftGroup
var syncSubject string

// Capture if we have existing assignment first.
if osa := js.streamAssignment(acc.Name, cfg.Name); osa != nil {
Expand All @@ -6008,7 +6031,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}
// This is an equal assignment.
self, rg = osa, osa.Group
self, rg, syncSubject = osa, osa.Group, osa.Sync
}

if cfg.Sealed {
Expand All @@ -6032,19 +6055,22 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}

// Make sure inflight is setup properly.
if cc.inflight == nil {
cc.inflight = make(map[string]map[string]*inflightInfo)
}
streams, ok := cc.inflight[acc.Name]
if !ok {
streams = make(map[string]*inflightInfo)
cc.inflight[acc.Name] = streams
}

// Raft group selection and placement.
if rg == nil {
// Check inflight before proposing in case we have an existing inflight proposal.
if cc.inflight == nil {
cc.inflight = make(map[string]map[string]*raftGroup)
}
streams, ok := cc.inflight[acc.Name]
if !ok {
streams = make(map[string]*raftGroup)
cc.inflight[acc.Name] = streams
} else if existing, ok := streams[cfg.Name]; ok {
// We have existing for same stream. Re-use same group.
rg = existing
if existing, ok := streams[cfg.Name]; ok {
// We have existing for same stream. Re-use same group and syncSubject.
rg, syncSubject = existing.rg, existing.sync
}
}
// Create a new one here if needed.
Expand All @@ -6060,14 +6086,17 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
rg.setPreferred()
}

if syncSubject == _EMPTY_ {
syncSubject = syncSubjForStream()
}
// Sync subject for post snapshot sync.
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
sa := &streamAssignment{Group: rg, Sync: syncSubject, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil {
// On success, add this as an inflight proposal so we can apply limits
// on concurrent create requests while this stream assignment has
// possibly not been processed yet.
if streams, ok := cc.inflight[acc.Name]; ok {
streams[cfg.Name] = rg
streams[cfg.Name] = &inflightInfo{rg, syncSubject}
}
}
}
Expand Down Expand Up @@ -8046,14 +8075,15 @@ func (mset *stream) isCurrent() bool {
}

// Maximum requests for the whole server that can be in flight at the same time.
const maxConcurrentSyncRequests = 16
const maxConcurrentSyncRequests = 32

var (
errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected")
errCatchupStalled = errors.New("catchup stalled")
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

// Process a stream snapshot.
Expand Down Expand Up @@ -8118,6 +8148,13 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
o.mu.Unlock()
}
mset.mu.Unlock()

// If we are interest based make sure to check our ack floor state.
// We will delay a bit to allow consumer states to also catchup.
if mset.isInterestRetention() {
fire := time.Duration(rand.Intn(int(10*time.Second))) + 5*time.Second
time.AfterFunc(fire, mset.checkInterestState)
}
}()

var releaseSem bool
Expand Down Expand Up @@ -8162,7 +8199,7 @@ RETRY:
numRetries++
if numRetries >= maxRetries {
// Force a hard reset here.
return errFirstSequenceMismatch
return errCatchupTooManyRetries
}

// Block here if we have too many requests in flight.
Expand Down Expand Up @@ -8239,7 +8276,6 @@ RETRY:
// Check for eof signaling.
if len(msg) == 0 {
msgsQ.recycle(&mrecs)
mset.checkInterestState()
return nil
}
if _, err := mset.processCatchupMsg(msg); err == nil {
Expand Down
42 changes: 42 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,48 @@ func TestJetStreamClusterParallelStreamCreation(t *testing.T) {
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}

// We had a bug during parallel stream creation as well that would overwrite the sync subject used for catchups, etc.
// Test that here as well by shutting down a non-leader, adding a whole bunch of messages, and making sure on restart
// we properly recover.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
nl.Shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

msg := bytes.Repeat([]byte("Z"), 128)
for i := 0; i < 100; i++ {
js.PublishAsync("common.foo.bar", msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// We need to force the leader to do a snapshot so we kick in upper layer catchup which depends on syncSubject.
sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())

nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

mset, err = nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

// Check state directly.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.mu.Unlock()

require_Equal(t, state.Msgs, 100)
require_Equal(t, state.FirstSeq, 1)
require_Equal(t, state.LastSeq, 100)
}

// In addition to test above, if streams were attempted to be created in parallel
Expand Down
6 changes: 4 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19512,7 +19512,7 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) {
})
require_NoError(t, err)

sub, err := js.SubscribeSync("", nats.Bind("name", durable))
sub, err := js.SubscribeSync(_EMPTY_, nats.Bind("name", durable))
require_NoError(t, err)

msg, err := sub.NextMsg(time.Millisecond * 500)
Expand All @@ -19537,7 +19537,9 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) {
require_NoError(t, err)

require_True(t, info.NumAckPending == 0)
require_True(t, info.AckFloor.Stream == 8)
// Should be 6 since we do not pull "other". We used to jump ack floor ahead
// but no longer do that.
require_True(t, info.AckFloor.Stream == 6)
require_True(t, info.AckFloor.Consumer == 1)
require_True(t, info.NumPending == 0)
}
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool {
var errFirstSequenceMismatch = errors.New("first sequence mismatch")

func isClusterResetErr(err error) bool {
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries
}

// Copy all fields.
Expand Down

0 comments on commit a72ea23

Please sign in to comment.