From bbffd71c4ae82c0d07fc4f1c178adc8266b779a1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Oct 2021 18:54:30 -0700 Subject: [PATCH] Improvements to meta raft layer around snapshots and recovery. Signed-off-by: Derek Collison --- server/jetstream.go | 11 +++- server/jetstream_cluster.go | 23 ++++--- server/jetstream_cluster_test.go | 90 ++------------------------ server/norace_test.go | 108 ++++++++++++++++++++++++++++++- server/raft.go | 49 +++++++++----- 5 files changed, 169 insertions(+), 112 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 6caa3388cc..b6eac8514e 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -463,9 +463,16 @@ func (s *Server) setJetStreamDisabled() { } } -func (s *Server) handleOutOfSpace(stream string) { +func (s *Server) handleOutOfSpace(mset *stream) { if s.JetStreamEnabled() && !s.jetStreamOOSPending() { - s.Errorf("JetStream out of resources, will be DISABLED") + var stream string + if mset != nil { + stream = mset.name() + s.Errorf("JetStream out of %s resources, will be DISABLED", mset.Store().Type()) + } else { + s.Errorf("JetStream out of resources, will be DISABLED") + } + go s.DisableJetStream() adv := &JSServerOutOfSpaceAdvisory{ diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3146c52a10..661dd95021 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -753,9 +753,19 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time + isRecovering bool + beenLeader bool ) + // Set to true to start. + isRecovering = true + + // Snapshotting function. doSnapshot := func() { + // Suppress during recovery. + if isRecovering { + return + } if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap = snap @@ -764,9 +774,6 @@ func (js *jetStream) monitorCluster() { } } - isRecovering := true - beenLeader := false - for { select { case <-s.quitCh: @@ -787,7 +794,7 @@ func (js *jetStream) monitorCluster() { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if nb > uint64(len(lastSnap)*4) { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 { doSnapshot() } } @@ -1743,7 +1750,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco s.Debugf("Got error processing JetStream msg: %v", err) } if isOutOfSpaceErr(err) { - s.handleOutOfSpace(mset.name()) + s.handleOutOfSpace(mset) return err } } @@ -4539,7 +4546,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } if err != nil && isOutOfSpaceErr(err) { - s.handleOutOfSpace(name) + s.handleOutOfSpace(mset) } return err @@ -4660,7 +4667,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) { mset.mu.Lock() state := mset.store.State() sreq := mset.calculateSyncRequest(&state, snap) - s, js, subject, n, name := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Name + s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node mset.mu.Unlock() // Just return if up to date or already exceeded limits. @@ -4767,7 +4774,7 @@ RETRY: return } } else if isOutOfSpaceErr(err) { - s.handleOutOfSpace(name) + s.handleOutOfSpace(mset) return } else if err == NewJSInsufficientResourcesError() { if mset.js.limitsExceeded(mset.cfg.Storage) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index dced30d09d..53ce4c57af 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -25,7 +25,6 @@ import ( "os" "path" "reflect" - "runtime" "strconv" "strings" "sync" @@ -5845,18 +5844,8 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) { c.waitOnStreamLeader("$G", "TEST") s = c.serverByName(s.Name()) - opts = s.getOpts() - c.waitOnStreamCurrent(s, "$G", "TEST") - snaps, err := ioutil.ReadDir(path.Join(opts.StoreDir, JetStreamStoreDir, "$SYS", "_js_", "_meta_", "snapshots")) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(snaps) == 0 { - t.Fatalf("Expected a meta snapshot for the restarted server") - } - // Now restart them all.. c.stopAll() c.restartAll() @@ -5868,8 +5857,12 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) { defer nc.Close() // Make sure the replicas are current. - checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { - si, _ := js.StreamInfo("TEST") + js2, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, _ := js2.StreamInfo("TEST") if si == nil || si.Cluster == nil { t.Fatalf("Did not get stream info") } @@ -8497,76 +8490,6 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) { wg.Wait() } -// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers. -func TestJetStreamClusterStreamReset(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - // Client based API - s := c.randomServer() - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo.*"}, - Replicas: 2, - Retention: nats.WorkQueuePolicy, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - numRequests := 20 - for i := 0; i < numRequests; i++ { - js.Publish("foo.created", []byte("REQ")) - } - - // Durable. - sub, err := js.SubscribeSync("foo.created", nats.Durable("d1")) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - si, err := js.StreamInfo("TEST") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if si.State.Msgs != uint64(numRequests) { - t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State) - } - // Let settle a bit. - time.Sleep(250 * time.Millisecond) - - // Grab number go routines. - base := runtime.NumGoroutine() - - // Grab a server that is the consumer leader for the durable. - cl := c.consumerLeader("$G", "TEST", "d1") - mset, err := cl.GlobalAccount().lookupStream("TEST") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // Do a hard reset here by hand. - mset.resetClusteredState() - // Wait til we have the leader elected. - c.waitOnConsumerLeader("$G", "TEST", "d1") - - // So do not wait 10s in call in checkFor. - js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) - // Make sure we can get the consumer info eventually. - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - _, err := js2.ConsumerInfo("TEST", "d1") - return err - }) - - // Grab number go routines. - if after := runtime.NumGoroutine(); base > after { - t.Fatalf("Expected %d go routines, got %d", base, after) - } -} - // Issue #2397 func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { c := createJetStreamClusterExplicit(t, "R2S", 2) @@ -9245,7 +9168,6 @@ func TestJetStreamAppendOnly(t *testing.T) { if resp.Error == nil { t.Fatalf("Expected an error") } - } // Support functions diff --git a/server/norace_test.go b/server/norace_test.go index 9dc386c731..08d0f6f6ce 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2042,8 +2042,6 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - fmt.Printf("CONNECT is %v\n", s.ClientURL()) - scm := make(map[string][]string) // Create 50 streams per cluster. @@ -3561,3 +3559,109 @@ func TestNoRaceJetStreamClusterMaxConsumersAndDirect(t *testing.T) { return nil }) } + +// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers. +func TestNoRaceJetStreamClusterStreamReset(t *testing.T) { + // Speed up raft + minElectionTimeout = 250 * time.Millisecond + maxElectionTimeout = time.Second + hbInterval = 50 * time.Millisecond + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Replicas: 2, + Retention: nats.WorkQueuePolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numRequests := 20 + for i := 0; i < numRequests; i++ { + js.Publish("foo.created", []byte("REQ")) + } + + // Durable. + sub, err := js.SubscribeSync("foo.created", nats.Durable("d1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != uint64(numRequests) { + t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State) + } + + // Let settle a bit for Go routine checks. + time.Sleep(250 * time.Millisecond) + + // Grab number go routines. + base := runtime.NumGoroutine() + + // Make the consumer busy here by async sending a bunch of messages. + for i := 0; i < numRequests*10; i++ { + js.PublishAsync("foo.created", []byte("REQ")) + } + + // Grab a server that is the consumer leader for the durable. + cl := c.consumerLeader("$G", "TEST", "d1") + mset, err := cl.GlobalAccount().lookupStream("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do a hard reset here by hand. + mset.resetClusteredState() + + // Wait til we have the consumer leader re-elected. + c.waitOnConsumerLeader("$G", "TEST", "d1") + + // So we do not wait all 10s in each call to ConsumerInfo. + js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) + // Make sure we can get the consumer info eventually. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + _, err := js2.ConsumerInfo("TEST", "d1") + return err + }) + + // Grab number go routines. + if after := runtime.NumGoroutine(); base > after { + t.Fatalf("Expected %d go routines, got %d", base, after) + } + + // Simulate a low level write error on our consumer and make sure we can recover etc. + cl = c.consumerLeader("$G", "TEST", "d1") + mset, err = cl.GlobalAccount().lookupStream("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + o := mset.lookupConsumer("d1") + if o == nil { + t.Fatalf("Did not retrieve consumer") + } + node := o.raftNode().(*raft) + if node == nil { + t.Fatalf("could not retrieve the raft node for consumer") + } + + nc.Close() + node.setWriteErr(io.ErrShortWrite) + + c.stopAll() + c.restartAll() + + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "d1") +} diff --git a/server/raft.go b/server/raft.go index 944f0360f4..b3fbbf3bf6 100644 --- a/server/raft.go +++ b/server/raft.go @@ -221,12 +221,21 @@ type lps struct { } const ( - minElectionTimeout = 2 * time.Second - maxElectionTimeout = 5 * time.Second - minCampaignTimeout = 100 * time.Millisecond - maxCampaignTimeout = 4 * minCampaignTimeout - hbInterval = 500 * time.Millisecond - lostQuorumInterval = hbInterval * 5 + minElectionTimeoutDefault = 2 * time.Second + maxElectionTimeoutDefault = 5 * time.Second + minCampaignTimeoutDefault = 100 * time.Millisecond + maxCampaignTimeoutDefault = 4 * minCampaignTimeoutDefault + hbIntervalDefault = 500 * time.Millisecond + lostQuorumIntervalDefault = hbIntervalDefault * 5 +) + +var ( + minElectionTimeout = minElectionTimeoutDefault + maxElectionTimeout = maxElectionTimeoutDefault + minCampaignTimeout = minCampaignTimeoutDefault + maxCampaignTimeout = maxCampaignTimeoutDefault + hbInterval = hbIntervalDefault + lostQuorumInterval = lostQuorumIntervalDefault ) type RaftConfig struct { @@ -388,19 +397,18 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { key := sha256.Sum256([]byte(n.group)) n.hh, _ = highwayhash.New64(key[:]) - if term, vote, err := n.readTermVote(); err != nil && term > 0 { + if term, vote, err := n.readTermVote(); err == nil && term > 0 { n.term = term n.vote = vote } - if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0750); err != nil { + if err := os.MkdirAll(path.Join(n.sd, snapshotsDir), 0750); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) } // Can't recover snapshots if memory based. if _, ok := n.wal.(*memStore); ok { - snapDir := path.Join(n.sd, snapshotsDir, "*") - os.RemoveAll(snapDir) + os.Remove(path.Join(n.sd, snapshotsDir, "*")) } else { // See if we have any snapshots and if so load and process on startup. n.setupLastSnapshot() @@ -937,6 +945,7 @@ func (n *raft) InstallSnapshot(data []byte) error { n.setWriteErr(err) return err } + n.Unlock() psnaps, _ := ioutil.ReadDir(snapDir) @@ -1110,7 +1119,7 @@ func (n *raft) isCurrent() bool { // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { - const okInterval = int64(hbInterval) * 2 + okInterval := int64(hbInterval) * 2 ts := time.Now().UnixNano() if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval { return true @@ -3060,18 +3069,26 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) { // Lock should be held. func (n *raft) setWriteErrLocked(err error) { + // Ignore if already set. + if n.werr == err { + return + } // Ignore non-write errors. if err != nil { if err == ErrStoreClosed || err == ErrStoreEOF || err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending { return } + // If this is a not found report but do not disable. + if os.IsNotExist(err) { + n.error("Resource not found: %v", err) + return + } + n.error("Critical write error: %v", err) } n.werr = err // For now since this can be happening all under the covers, we will call up and disable JetStream. - n.Unlock() - n.s.handleOutOfSpace(_EMPTY_) - n.Lock() + go n.s.handleOutOfSpace(nil) } // Capture our write error if any and hold. @@ -3101,7 +3118,7 @@ func (n *raft) fileWriter() { n.RUnlock() if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { n.setWriteErr(err) - n.error("Error writing term and vote file for %q: %v", n.group, err) + n.warn("Error writing term and vote file for %q: %v", n.group, err) } case <-n.wpsch: n.RLock() @@ -3109,7 +3126,7 @@ func (n *raft) fileWriter() { n.RUnlock() if err := ioutil.WriteFile(psf, buf, 0640); err != nil { n.setWriteErr(err) - n.error("Error writing peer state file for %q: %v", n.group, err) + n.warn("Error writing peer state file for %q: %v", n.group, err) } } }