Skip to content

Commit

Permalink
Improvements to meta raft layer around snapshots and recovery.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 12, 2021
1 parent 5e1276c commit bbffd71
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 112 deletions.
11 changes: 9 additions & 2 deletions server/jetstream.go
Expand Up @@ -463,9 +463,16 @@ func (s *Server) setJetStreamDisabled() {
}
}

func (s *Server) handleOutOfSpace(stream string) {
func (s *Server) handleOutOfSpace(mset *stream) {
if s.JetStreamEnabled() && !s.jetStreamOOSPending() {
s.Errorf("JetStream out of resources, will be DISABLED")
var stream string
if mset != nil {
stream = mset.name()
s.Errorf("JetStream out of %s resources, will be DISABLED", mset.Store().Type())
} else {
s.Errorf("JetStream out of resources, will be DISABLED")
}

go s.DisableJetStream()

adv := &JSServerOutOfSpaceAdvisory{
Expand Down
23 changes: 15 additions & 8 deletions server/jetstream_cluster.go
Expand Up @@ -753,9 +753,19 @@ func (js *jetStream) monitorCluster() {
isLeader bool
lastSnap []byte
lastSnapTime time.Time
isRecovering bool
beenLeader bool
)

// Set to true to start.
isRecovering = true

// Snapshotting function.
doSnapshot := func() {
// Suppress during recovery.
if isRecovering {
return
}
if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = snap
Expand All @@ -764,9 +774,6 @@ func (js *jetStream) monitorCluster() {
}
}

isRecovering := true
beenLeader := false

for {
select {
case <-s.quitCh:
Expand All @@ -787,7 +794,7 @@ func (js *jetStream) monitorCluster() {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
} else if nb > uint64(len(lastSnap)*4) {
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 {
doSnapshot()
}
}
Expand Down Expand Up @@ -1743,7 +1750,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
s.Debugf("Got error processing JetStream msg: %v", err)
}
if isOutOfSpaceErr(err) {
s.handleOutOfSpace(mset.name())
s.handleOutOfSpace(mset)
return err
}
}
Expand Down Expand Up @@ -4539,7 +4546,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}

if err != nil && isOutOfSpaceErr(err) {
s.handleOutOfSpace(name)
s.handleOutOfSpace(mset)
}

return err
Expand Down Expand Up @@ -4660,7 +4667,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
mset.mu.Lock()
state := mset.store.State()
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n, name := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Name
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
mset.mu.Unlock()

// Just return if up to date or already exceeded limits.
Expand Down Expand Up @@ -4767,7 +4774,7 @@ RETRY:
return
}
} else if isOutOfSpaceErr(err) {
s.handleOutOfSpace(name)
s.handleOutOfSpace(mset)
return
} else if err == NewJSInsufficientResourcesError() {
if mset.js.limitsExceeded(mset.cfg.Storage) {
Expand Down
90 changes: 6 additions & 84 deletions server/jetstream_cluster_test.go
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"path"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -5845,18 +5844,8 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) {
c.waitOnStreamLeader("$G", "TEST")

s = c.serverByName(s.Name())
opts = s.getOpts()

c.waitOnStreamCurrent(s, "$G", "TEST")

snaps, err := ioutil.ReadDir(path.Join(opts.StoreDir, JetStreamStoreDir, "$SYS", "_js_", "_meta_", "snapshots"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(snaps) == 0 {
t.Fatalf("Expected a meta snapshot for the restarted server")
}

// Now restart them all..
c.stopAll()
c.restartAll()
Expand All @@ -5868,8 +5857,12 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) {
defer nc.Close()

// Make sure the replicas are current.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, _ := js.StreamInfo("TEST")
js2, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, _ := js2.StreamInfo("TEST")
if si == nil || si.Cluster == nil {
t.Fatalf("Did not get stream info")
}
Expand Down Expand Up @@ -8497,76 +8490,6 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}

// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
Expand Down Expand Up @@ -9245,7 +9168,6 @@ func TestJetStreamAppendOnly(t *testing.T) {
if resp.Error == nil {
t.Fatalf("Expected an error")
}

}

// Support functions
Expand Down
108 changes: 106 additions & 2 deletions server/norace_test.go
Expand Up @@ -2042,8 +2042,6 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

fmt.Printf("CONNECT is %v\n", s.ClientURL())

scm := make(map[string][]string)

// Create 50 streams per cluster.
Expand Down Expand Up @@ -3561,3 +3559,109 @@ func TestNoRaceJetStreamClusterMaxConsumersAndDirect(t *testing.T) {
return nil
})
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {
// Speed up raft
minElectionTimeout = 250 * time.Millisecond
maxElectionTimeout = time.Second
hbInterval = 50 * time.Millisecond

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}

// Let settle a bit for Go routine checks.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Make the consumer busy here by async sending a bunch of messages.
for i := 0; i < numRequests*10; i++ {
js.PublishAsync("foo.created", []byte("REQ"))
}

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()

// Wait til we have the consumer leader re-elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So we do not wait all 10s in each call to ConsumerInfo.
js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}

// Simulate a low level write error on our consumer and make sure we can recover etc.
cl = c.consumerLeader("$G", "TEST", "d1")
mset, err = cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
o := mset.lookupConsumer("d1")
if o == nil {
t.Fatalf("Did not retrieve consumer")
}
node := o.raftNode().(*raft)
if node == nil {
t.Fatalf("could not retrieve the raft node for consumer")
}

nc.Close()
node.setWriteErr(io.ErrShortWrite)

c.stopAll()
c.restartAll()

c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "d1")
}

0 comments on commit bbffd71

Please sign in to comment.