diff --git a/server/clustering.go b/server/clustering.go index 4b48fda3..48b27da1 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -297,6 +297,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { if err != nil { return false, err } + store.setCacheSize(s.opts.Clustering.LogCacheSize) // Go through the list of channels that we have recovered from streaming store // and set their corresponding UID. @@ -311,12 +312,6 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { } s.channels.Unlock() - cacheStore, err := raft.NewLogCache(s.opts.Clustering.LogCacheSize, store) - if err != nil { - store.Close() - return false, err - } - addr := s.getClusteringAddr(name) config := raft.DefaultConfig() // For tests @@ -383,7 +378,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { fsm.Unlock() } s.raft.fsm = fsm - node, err := raft.NewRaft(config, fsm, cacheStore, store, snapshotStore, transport) + node, err := raft.NewRaft(config, fsm, store, store, snapshotStore, transport) if err != nil { transport.Close() store.Close() @@ -392,7 +387,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { if testPauseAfterNewRaftCalled { time.Sleep(time.Second) } - existingState, err := raft.HasExistingState(cacheStore, store, snapshotStore) + existingState, err := raft.HasExistingState(store, store, snapshotStore) if err != nil { node.Shutdown() transport.Close() diff --git a/server/raft_log.go b/server/raft_log.go index ca81ad7d..055247db 100644 --- a/server/raft_log.go +++ b/server/raft_log.go @@ -47,6 +47,10 @@ type raftLog struct { codec *codec.MsgpackHandle closed bool + // Our cache + cache []*raft.Log + cacheSize uint64 // Save size as uint64 to not have to case during store/load + // If the store is using encryption encryption bool eds *stores.EDStore @@ -91,6 +95,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo return r, nil } +func (r *raftLog) setCacheSize(cacheSize int) { + r.Lock() + defer r.Unlock() + r.cacheSize = uint64(cacheSize) + r.cache = make([]*raft.Log, cacheSize) +} + func (r *raftLog) init() error { tx, err := r.conn.Begin(true) if err != nil { @@ -213,6 +224,14 @@ func (r *raftLog) getIndex(first bool) (uint64, error) { // GetLog implements the LogStore interface func (r *raftLog) GetLog(idx uint64, log *raft.Log) error { r.RLock() + if r.cache != nil { + cached := r.cache[idx%r.cacheSize] + if cached != nil && cached.Index == idx { + *log = *cached + r.RUnlock() + return nil + } + } tx, err := r.conn.Begin(false) if err != nil { r.RUnlock() @@ -265,6 +284,12 @@ func (r *raftLog) StoreLogs(logs []*raft.Log) error { tx.Rollback() } else { err = tx.Commit() + if err == nil && r.cache != nil { + // Cache only on success + for _, l := range logs { + r.cache[l.Index%r.cacheSize] = l + } + } } r.Unlock() return err @@ -275,6 +300,11 @@ func (r *raftLog) DeleteRange(min, max uint64) (retErr error) { r.Lock() defer r.Unlock() + if r.cacheSize > 0 { + // Reset cache + r.cache = make([]*raft.Log, int(r.cacheSize)) + } + start := time.Now() r.log.Noticef("Deleting raft logs from %v to %v", min, max) err := r.deleteRange(min, max) diff --git a/server/raft_log_test.go b/server/raft_log_test.go index 6329bcec..8005bdda 100644 --- a/server/raft_log_test.go +++ b/server/raft_log_test.go @@ -512,3 +512,93 @@ func TestRaftLogChannelID(t *testing.T) { getID("bar", 0) getID("baz", 0) } + +func TestRaftLogCache(t *testing.T) { + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + store := createTestRaftLog(t, false, 0) + defer store.Close() + + store.setCacheSize(10) + + store.RLock() + lc := len(store.cache) + cs := store.cacheSize + store.RUnlock() + if lc != 10 { + t.Fatalf("Expected cache len to be 10, got %v", lc) + } + if cs != 10 { + t.Fatalf("Expected cacheSize to be 10, got %v", cs) + } + + l1 := &raft.Log{Index: 1, Data: []byte("msg1")} + if err := store.StoreLog(l1); err != nil { + t.Fatalf("Error on store: %v", err) + } + + l2 := &raft.Log{Index: 2, Data: []byte("msg2")} + l3 := &raft.Log{Index: 3, Data: []byte("msg3")} + if err := store.StoreLogs([]*raft.Log{l2, l3}); err != nil { + t.Fatalf("Error on store: %v", err) + } + + store.RLock() + cl1 := store.cache[1%10] + cl2 := store.cache[2%10] + cl3 := store.cache[3%10] + store.RUnlock() + if cl1 == nil || cl2 == nil || cl3 == nil || cl1.Index != 1 || cl2.Index != 2 || cl3.Index != 3 { + t.Fatalf("Wrong content: l1=%v l2=%v l3=%v", cl1, cl2, cl3) + } + + l11 := &raft.Log{Index: 11, Data: []byte("msg11")} + if err := store.StoreLog(l11); err != nil { + t.Fatalf("Error on store: %v", err) + } + var cl11 raft.Log + if err := store.GetLog(11, &cl11); err != nil || cl11.Index != 11 || string(cl11.Data) != "msg11" { + t.Fatalf("Unexpected err=%v msg=%v", err, cl11) + } + + if err := store.DeleteRange(1, 2); err != nil { + t.Fatalf("Error on delete range: %v", err) + } + var err error + store.RLock() + lc = len(store.cache) + cs = store.cacheSize + for _, l := range store.cache { + if l != nil { + err = fmt.Errorf("Log still in cache: %v", l) + break + } + } + store.RUnlock() + if lc != 10 { + t.Fatalf("Expected cache len to be 10, got %v", lc) + } + if cs != 10 { + t.Fatalf("Expected cacheSize to be 10, got %v", cs) + } + if err != nil { + t.Fatal(err.Error()) + } + + // Now cause encoding to fail so that storelog fails and + // we check that log was not cached. + store.Lock() + store.conn.Close() + store.Unlock() + l4 := &raft.Log{Index: 4, Data: []byte("msg4")} + if err := store.StoreLog(l4); err == nil { + t.Fatal("Expected error on store") + } + store.RLock() + cl4 := store.cache[4%10] + store.RUnlock() + if cl4 != nil { + t.Fatalf("Expected log 4 not to be cached, got %v", cl4) + } +} diff --git a/stores/filestore_test.go b/stores/filestore_test.go index 1b7cbc3d..3f696741 100644 --- a/stores/filestore_test.go +++ b/stores/filestore_test.go @@ -1781,7 +1781,7 @@ func TestFSFilesClosedOnRecovery(t *testing.T) { cleanupFSDatastore(t) defer cleanupFSDatastore(t) - s := createDefaultFileStore(t, SliceConfig(1, 0, 0, "")) + s := createDefaultFileStore(t, SliceConfig(1, 0, 0, ""), DoSync(false)) defer s.Close() limits := testDefaultStoreLimits