diff --git a/server/clustering.go b/server/clustering.go index 757596c8..25fdc27e 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -89,6 +89,21 @@ type ClusteringOptions struct { RaftElectionTimeout time.Duration RaftLeaseTimeout time.Duration RaftCommitTimeout time.Duration + + // These options influence the RAFT store implementation which uses bolt DB. + // + // Sync freelist to disk. This reduces the database write performance, but + // speed up recovery since there is no need for a full database re-sync. + BoltFreeListSync bool + + // BoltFreeListArray sets the backend freelist type to "array". + // There are two options: + // - "array" which is simple but suffers dramatic performance degradation if database is + // large and framentation in freelist is common. + // - "hashmap" which is faster in almost all circumstances but doesn't guarantee + // that it offers the smallest page id available. In normal case it is safe. + // Since v0.21.0, the default is "hashmap". Set this option to "true" to use "array" instead. + BoltFreeListArray bool } // raftNode is a handle to a member in a Raft consensus group. @@ -304,12 +319,10 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { s.raft = &raftNode{} raftLogFileName := filepath.Join(path, raftLogFile) - store, err := newRaftLog(s.log, raftLogFileName, s.opts.Clustering.Sync, int(s.opts.Clustering.TrailingLogs), - s.opts.Encrypt, s.opts.EncryptionCipher, s.opts.EncryptionKey) + store, err := newRaftLog(s.log, raftLogFileName, s.opts) if err != nil { return false, err } - store.setCacheSize(s.opts.Clustering.LogCacheSize) // Go through the list of channels that we have recovered from streaming store // and set their corresponding UID. diff --git a/server/conf.go b/server/conf.go index e8ff6fe4..e907ef11 100644 --- a/server/conf.go +++ b/server/conf.go @@ -351,6 +351,16 @@ func parseCluster(itf interface{}, opts *Options) error { case "raft_commit_timeout": opts.Clustering.RaftCommitTimeout = dur } + case "bolt_free_list_sync": + if err := checkType(k, reflect.Bool, v); err != nil { + return err + } + opts.Clustering.BoltFreeListSync = v.(bool) + case "bolt_free_list_array": + if err := checkType(k, reflect.Bool, v); err != nil { + return err + } + opts.Clustering.BoltFreeListArray = v.(bool) } } return nil diff --git a/server/conf_test.go b/server/conf_test.go index 06f0331d..105d31d2 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -275,6 +275,12 @@ func TestParseConfig(t *testing.T) { if !opts.Clustering.AllowAddRemoveNode { t.Fatal("Expected AllowAddRemoveNode to be true") } + if !opts.Clustering.BoltFreeListSync { + t.Fatal("Expected BoltFreeListSync to be true") + } + if !opts.Clustering.BoltFreeListArray { + t.Fatal("Expected BoltFreeListArray to be true") + } if opts.SQLStoreOpts.Driver != "mysql" { t.Fatalf("Expected SQL Driver to be %q, got %q", "mysql", opts.SQLStoreOpts.Driver) } @@ -505,6 +511,8 @@ func TestParseWrongTypes(t *testing.T) { expectFailureFor(t, "cluster:{raft_commit_timeout:123}", wrongTypeErr) expectFailureFor(t, "cluster:{raft_commit_timeout:\"not_a_time\"}", wrongTimeErr) expectFailureFor(t, "cluster:{allow_add_remove_node:1}", wrongTypeErr) + expectFailureFor(t, "cluster:{bolt_free_list_sync:123}", wrongTypeErr) + expectFailureFor(t, "cluster:{bolt_free_list_array:123}", wrongTypeErr) expectFailureFor(t, "sql:{driver:false}", wrongTypeErr) expectFailureFor(t, "sql:{source:false}", wrongTypeErr) expectFailureFor(t, "sql:{no_caching:123}", wrongTypeErr) diff --git a/server/raft_log.go b/server/raft_log.go index 08de7db1..217905fd 100644 --- a/server/raft_log.go +++ b/server/raft_log.go @@ -48,8 +48,8 @@ type raftLog struct { closed bool // Our cache - cache []*raft.Log - cacheSize uint64 // Save size as uint64 to not have to case during store/load + cache []*raft.Log // Simple array containing sequence based on modulo + cacheSize uint64 // Save size as uint64 to not have to case during store/load // If the store is using encryption encryption bool @@ -58,16 +58,23 @@ type raftLog struct { encryptOffset int } -func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bool, encryptionCipher string, encryptionKey []byte) (*raftLog, error) { +func newRaftLog(log logger.Logger, fileName string, opts *Options) (*raftLog, error) { + if opts == nil { + opts = GetDefaultOptions() + } r := &raftLog{ log: log, fileName: fileName, codec: &codec.MsgpackHandle{}, } + freeListType := bolt.FreelistMapType + if opts.Clustering.BoltFreeListArray { + freeListType = bolt.FreelistArrayType + } dbOpts := &bolt.Options{ - NoSync: !sync, - NoFreelistSync: true, - FreelistType: bolt.FreelistMapType, + NoSync: !opts.Clustering.Sync, + NoFreelistSync: !opts.Clustering.BoltFreeListSync, + FreelistType: freeListType, } db, err := bolt.Open(fileName, 0600, dbOpts) if err != nil { @@ -78,13 +85,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo r.conn.Close() return nil, err } - if encrypt { + if opts.Encrypt { lastIndex, err := r.getIndex(false) if err != nil { r.conn.Close() return nil, err } - eds, err := stores.NewEDStore(encryptionCipher, encryptionKey, lastIndex) + eds, err := stores.NewEDStore(opts.EncryptionCipher, opts.EncryptionKey, lastIndex) if err != nil { r.conn.Close() return nil, err @@ -94,16 +101,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo r.encryptBuf = make([]byte, 100) r.encryptOffset = eds.EncryptionOffset() } + if cacheSize := opts.Clustering.LogCacheSize; cacheSize > 0 { + r.cacheSize = uint64(cacheSize) + r.cache = make([]*raft.Log, cacheSize) + } return r, nil } -func (r *raftLog) setCacheSize(cacheSize int) { - r.Lock() - defer r.Unlock() - r.cacheSize = uint64(cacheSize) - r.cache = make([]*raft.Log, cacheSize) -} - func (r *raftLog) init() error { tx, err := r.conn.Begin(true) if err != nil { diff --git a/server/raft_log_test.go b/server/raft_log_test.go index 8005bdda..1dde5c8d 100644 --- a/server/raft_log_test.go +++ b/server/raft_log_test.go @@ -27,12 +27,12 @@ import ( "github.com/nats-io/nats-streaming-server/stores" ) -func createTestRaftLog(t tLogger, sync bool, trailingLogs int) *raftLog { +func createTestRaftLog(t tLogger, opts *Options) *raftLog { if err := os.MkdirAll(defaultRaftLog, os.ModeDir+os.ModePerm); err != nil { stackFatalf(t, "Unable to create raft log directory: %v", err) } fileName := filepath.Join(defaultRaftLog, raftLogFile) - store, err := newRaftLog(testLogger, fileName, sync, trailingLogs, false, stores.CryptoCipherAutoSelect, nil) + store, err := newRaftLog(testLogger, fileName, opts) if err != nil { stackFatalf(t, "Error creating store: %v", err) } @@ -44,7 +44,7 @@ func TestRaftLogDeleteRange(t *testing.T) { defer cleanupRaftLog(t) // No sync (will check that conn's NoSync value is correct after a recreating the file) - store := createTestRaftLog(t, false, 0) + store := createTestRaftLog(t, nil) defer store.Close() // Store in dbConf bucket @@ -145,7 +145,7 @@ func TestRaftLogEncodeDecodeLogs(t *testing.T) { cleanupRaftLog(t) defer cleanupRaftLog(t) - store := createTestRaftLog(t, false, 0) + store := createTestRaftLog(t, nil) defer store.Close() total := 50000 @@ -236,7 +236,7 @@ func TestRaftLogWithEncryption(t *testing.T) { cleanupRaftLog(t) defer cleanupRaftLog(t) - store := createTestRaftLog(t, false, 0) + store := createTestRaftLog(t, nil) defer store.Close() // Plain text log store.StoreLog(&raft.Log{Index: 1, Term: 1, Type: raft.LogCommand, Data: []byte("abcd")}) @@ -248,7 +248,11 @@ func TestRaftLogWithEncryption(t *testing.T) { store.Close() // Re-open as encrypted store - store, err := newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("testkey")) + opts := GetDefaultOptions() + opts.Encrypt = true + opts.EncryptionCipher = stores.CryptoCipherAES + opts.EncryptionKey = []byte("testkey") + store, err := newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error opening store: %v", err) } @@ -275,7 +279,8 @@ func TestRaftLogWithEncryption(t *testing.T) { fileName = filepath.Join(defaultRaftLog, raftLogFile) key := []byte("testkey") - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, key) + opts.EncryptionKey = key + store, err = newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -325,7 +330,9 @@ func TestRaftLogWithEncryption(t *testing.T) { if err := os.Setenv(stores.CryptoStoreEnvKeyName, "testkey"); err != nil { t.Fatalf("Unable to set environment variable: %v", err) } - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, nil) + opts.EncryptionCipher = stores.CryptoCipherAES + opts.EncryptionKey = nil + store, err = newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -341,7 +348,8 @@ func TestRaftLogWithEncryption(t *testing.T) { // Ensure that env key override config by providing a wrong key // and notice that we have correct decrypt. - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("wrongkey")) + opts.EncryptionKey = []byte("wrongkey") + store, err = newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -357,7 +365,9 @@ func TestRaftLogWithEncryption(t *testing.T) { // Now unset env variable and re-open with wrong key os.Unsetenv(stores.CryptoStoreEnvKeyName) - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("wrongkey")) + opts.EncryptionCipher = stores.CryptoCipherAES + opts.EncryptionKey = []byte("wrongkey") + store, err = newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -369,7 +379,8 @@ func TestRaftLogWithEncryption(t *testing.T) { store.Close() // Re-open with encryption but no key, this should fail. - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, nil) + opts.EncryptionKey = nil + store, err = newRaftLog(testLogger, fileName, opts) if err == nil || !strings.Contains(err.Error(), stores.ErrCryptoStoreRequiresKey.Error()) { if store != nil { store.Close() @@ -387,7 +398,9 @@ func TestRaftLogMultipleCiphers(t *testing.T) { } fileName := filepath.Join(defaultRaftLog, raftLogFile) - store, err := newRaftLog(testLogger, fileName, false, 0, false, stores.CryptoCipherAutoSelect, nil) + opts := GetDefaultOptions() + opts.EncryptionCipher = stores.CryptoCipherAutoSelect + store, err := newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -406,7 +419,11 @@ func TestRaftLogMultipleCiphers(t *testing.T) { storeWithEncryption := func(t *testing.T, encryptionCipher string, payloadIdx int) { t.Helper() - store, err := newRaftLog(testLogger, fileName, false, 0, true, encryptionCipher, []byte("mykey")) + opts := GetDefaultOptions() + opts.Encrypt = true + opts.EncryptionCipher = encryptionCipher + opts.EncryptionKey = []byte("mykey") + store, err := newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -422,7 +439,11 @@ func TestRaftLogMultipleCiphers(t *testing.T) { // Now re-open with any cipher, use the auto-select one. // We should be able to get all 3 messages correctly. - store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAutoSelect, []byte("mykey")) + opts = GetDefaultOptions() + opts.Encrypt = true + opts.EncryptionCipher = stores.CryptoCipherAutoSelect + opts.EncryptionKey = []byte("mykey") + store, err = newRaftLog(testLogger, fileName, opts) if err != nil { t.Fatalf("Error creating store: %v", err) } @@ -442,7 +463,7 @@ func TestRaftLogChannelID(t *testing.T) { cleanupRaftLog(t) defer cleanupRaftLog(t) - store := createTestRaftLog(t, false, 0) + store := createTestRaftLog(t, nil) defer store.Close() storeID := func(name string, id uint64) { @@ -488,7 +509,7 @@ func TestRaftLogChannelID(t *testing.T) { deleteID("baz") store.Close() - store = createTestRaftLog(t, false, 0) + store = createTestRaftLog(t, nil) defer store.Close() // Make sure that last ID is returned @@ -504,7 +525,7 @@ func TestRaftLogChannelID(t *testing.T) { deleteID("bar") store.Close() - store = createTestRaftLog(t, false, 0) + store = createTestRaftLog(t, nil) defer store.Close() // No ID returned @@ -517,11 +538,11 @@ func TestRaftLogCache(t *testing.T) { cleanupRaftLog(t) defer cleanupRaftLog(t) - store := createTestRaftLog(t, false, 0) + opts := GetDefaultOptions() + opts.Clustering.LogCacheSize = 10 + store := createTestRaftLog(t, opts) defer store.Close() - store.setCacheSize(10) - store.RLock() lc := len(store.cache) cs := store.cacheSize diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index ef765d3a..0e1ca973 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -92,6 +92,8 @@ streaming: { raft_lease_timeout: "500ms" raft_commit_timeout: "50ms" allow_add_remove_node: true + bolt_free_list_sync: true + bolt_free_list_array: true } sql: {