Skip to content

Commit

Permalink
[FIXED] Clustering: possible panic on restart with "log not found"
Browse files Browse the repository at this point in the history
Was found by chance when causing backend boltdb store to throw
error when storing the raft log. The fact that a front-end log
cache was used and improperly caching logs that had failed to
be stored caused an issue for raft library. I have filed an
issue (and PR to fix) to the hashicorp/raft repo, but actually
decided to not use their LogCache since it can be easily added
to our raftLog implementation, reducing locking.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 15, 2020
1 parent 4208132 commit e5e9dab
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 9 deletions.
11 changes: 3 additions & 8 deletions server/clustering.go
Expand Up @@ -297,6 +297,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if err != nil {
return false, err
}
store.setCacheSize(s.opts.Clustering.LogCacheSize)

// Go through the list of channels that we have recovered from streaming store
// and set their corresponding UID.
Expand All @@ -311,12 +312,6 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
}
s.channels.Unlock()

cacheStore, err := raft.NewLogCache(s.opts.Clustering.LogCacheSize, store)
if err != nil {
store.Close()
return false, err
}

addr := s.getClusteringAddr(name)
config := raft.DefaultConfig()
// For tests
Expand Down Expand Up @@ -383,7 +378,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
fsm.Unlock()
}
s.raft.fsm = fsm
node, err := raft.NewRaft(config, fsm, cacheStore, store, snapshotStore, transport)
node, err := raft.NewRaft(config, fsm, store, store, snapshotStore, transport)
if err != nil {
transport.Close()
store.Close()
Expand All @@ -392,7 +387,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if testPauseAfterNewRaftCalled {
time.Sleep(time.Second)
}
existingState, err := raft.HasExistingState(cacheStore, store, snapshotStore)
existingState, err := raft.HasExistingState(store, store, snapshotStore)
if err != nil {
node.Shutdown()
transport.Close()
Expand Down
30 changes: 30 additions & 0 deletions server/raft_log.go
Expand Up @@ -47,6 +47,10 @@ type raftLog struct {
codec *codec.MsgpackHandle
closed bool

// Our cache
cache []*raft.Log
cacheSize uint64 // Save size as uint64 to not have to case during store/load

// If the store is using encryption
encryption bool
eds *stores.EDStore
Expand Down Expand Up @@ -91,6 +95,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo
return r, nil
}

func (r *raftLog) setCacheSize(cacheSize int) {
r.Lock()
defer r.Unlock()
r.cacheSize = uint64(cacheSize)
r.cache = make([]*raft.Log, cacheSize)
}

func (r *raftLog) init() error {
tx, err := r.conn.Begin(true)
if err != nil {
Expand Down Expand Up @@ -213,6 +224,14 @@ func (r *raftLog) getIndex(first bool) (uint64, error) {
// GetLog implements the LogStore interface
func (r *raftLog) GetLog(idx uint64, log *raft.Log) error {
r.RLock()
if r.cache != nil {
cached := r.cache[idx%r.cacheSize]
if cached != nil && cached.Index == idx {
*log = *cached
r.RUnlock()
return nil
}
}
tx, err := r.conn.Begin(false)
if err != nil {
r.RUnlock()
Expand Down Expand Up @@ -265,6 +284,12 @@ func (r *raftLog) StoreLogs(logs []*raft.Log) error {
tx.Rollback()
} else {
err = tx.Commit()
if err == nil && r.cache != nil {
// Cache only on success
for _, l := range logs {
r.cache[l.Index%r.cacheSize] = l
}
}
}
r.Unlock()
return err
Expand All @@ -275,6 +300,11 @@ func (r *raftLog) DeleteRange(min, max uint64) (retErr error) {
r.Lock()
defer r.Unlock()

if r.cacheSize > 0 {
// Reset cache
r.cache = make([]*raft.Log, int(r.cacheSize))
}

start := time.Now()
r.log.Noticef("Deleting raft logs from %v to %v", min, max)
err := r.deleteRange(min, max)
Expand Down
90 changes: 90 additions & 0 deletions server/raft_log_test.go
Expand Up @@ -512,3 +512,93 @@ func TestRaftLogChannelID(t *testing.T) {
getID("bar", 0)
getID("baz", 0)
}

func TestRaftLogCache(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
defer store.Close()

store.setCacheSize(10)

store.RLock()
lc := len(store.cache)
cs := store.cacheSize
store.RUnlock()
if lc != 10 {
t.Fatalf("Expected cache len to be 10, got %v", lc)
}
if cs != 10 {
t.Fatalf("Expected cacheSize to be 10, got %v", cs)
}

l1 := &raft.Log{Index: 1, Data: []byte("msg1")}
if err := store.StoreLog(l1); err != nil {
t.Fatalf("Error on store: %v", err)
}

l2 := &raft.Log{Index: 2, Data: []byte("msg2")}
l3 := &raft.Log{Index: 3, Data: []byte("msg3")}
if err := store.StoreLogs([]*raft.Log{l2, l3}); err != nil {
t.Fatalf("Error on store: %v", err)
}

store.RLock()
cl1 := store.cache[1%10]
cl2 := store.cache[2%10]
cl3 := store.cache[3%10]
store.RUnlock()
if cl1 == nil || cl2 == nil || cl3 == nil || cl1.Index != 1 || cl2.Index != 2 || cl3.Index != 3 {
t.Fatalf("Wrong content: l1=%v l2=%v l3=%v", cl1, cl2, cl3)
}

l11 := &raft.Log{Index: 11, Data: []byte("msg11")}
if err := store.StoreLog(l11); err != nil {
t.Fatalf("Error on store: %v", err)
}
var cl11 raft.Log
if err := store.GetLog(11, &cl11); err != nil || cl11.Index != 11 || string(cl11.Data) != "msg11" {
t.Fatalf("Unexpected err=%v msg=%v", err, cl11)
}

if err := store.DeleteRange(1, 2); err != nil {
t.Fatalf("Error on delete range: %v", err)
}
var err error
store.RLock()
lc = len(store.cache)
cs = store.cacheSize
for _, l := range store.cache {
if l != nil {
err = fmt.Errorf("Log still in cache: %v", l)
break
}
}
store.RUnlock()
if lc != 10 {
t.Fatalf("Expected cache len to be 10, got %v", lc)
}
if cs != 10 {
t.Fatalf("Expected cacheSize to be 10, got %v", cs)
}
if err != nil {
t.Fatal(err.Error())
}

// Now cause encoding to fail so that storelog fails and
// we check that log was not cached.
store.Lock()
store.conn.Close()
store.Unlock()
l4 := &raft.Log{Index: 4, Data: []byte("msg4")}
if err := store.StoreLog(l4); err == nil {
t.Fatal("Expected error on store")
}
store.RLock()
cl4 := store.cache[4%10]
store.RUnlock()
if cl4 != nil {
t.Fatalf("Expected log 4 not to be cached, got %v", cl4)
}
}
2 changes: 1 addition & 1 deletion stores/filestore_test.go
Expand Up @@ -1781,7 +1781,7 @@ func TestFSFilesClosedOnRecovery(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t, SliceConfig(1, 0, 0, ""))
s := createDefaultFileStore(t, SliceConfig(1, 0, 0, ""), DoSync(false))
defer s.Close()

limits := testDefaultStoreLimits
Expand Down

0 comments on commit e5e9dab

Please sign in to comment.