Skip to content

Commit

Permalink
Merge pull request #1156 from nats-io/expose_boltdb_conf
Browse files Browse the repository at this point in the history
[ADDED] Configuration parameters for RAFT's BoltDB store
  • Loading branch information
kozlovic committed Feb 17, 2021
2 parents b69ffa9 + f2288d2 commit 73f2531
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 38 deletions.
19 changes: 16 additions & 3 deletions server/clustering.go
Expand Up @@ -89,6 +89,21 @@ type ClusteringOptions struct {
RaftElectionTimeout time.Duration
RaftLeaseTimeout time.Duration
RaftCommitTimeout time.Duration

// These options influence the RAFT store implementation which uses bolt DB.
//
// Sync freelist to disk. This reduces the database write performance, but
// speed up recovery since there is no need for a full database re-sync.
BoltFreeListSync bool

// BoltFreeListArray sets the backend freelist type to "array".
// There are two options:
// - "array" which is simple but suffers dramatic performance degradation if database is
// large and framentation in freelist is common.
// - "hashmap" which is faster in almost all circumstances but doesn't guarantee
// that it offers the smallest page id available. In normal case it is safe.
// Since v0.21.0, the default is "hashmap". Set this option to "true" to use "array" instead.
BoltFreeListArray bool
}

// raftNode is a handle to a member in a Raft consensus group.
Expand Down Expand Up @@ -304,12 +319,10 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
s.raft = &raftNode{}

raftLogFileName := filepath.Join(path, raftLogFile)
store, err := newRaftLog(s.log, raftLogFileName, s.opts.Clustering.Sync, int(s.opts.Clustering.TrailingLogs),
s.opts.Encrypt, s.opts.EncryptionCipher, s.opts.EncryptionKey)
store, err := newRaftLog(s.log, raftLogFileName, s.opts)
if err != nil {
return false, err
}
store.setCacheSize(s.opts.Clustering.LogCacheSize)

// Go through the list of channels that we have recovered from streaming store
// and set their corresponding UID.
Expand Down
10 changes: 10 additions & 0 deletions server/conf.go
Expand Up @@ -351,6 +351,16 @@ func parseCluster(itf interface{}, opts *Options) error {
case "raft_commit_timeout":
opts.Clustering.RaftCommitTimeout = dur
}
case "bolt_free_list_sync":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.Clustering.BoltFreeListSync = v.(bool)
case "bolt_free_list_array":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.Clustering.BoltFreeListArray = v.(bool)
}
}
return nil
Expand Down
8 changes: 8 additions & 0 deletions server/conf_test.go
Expand Up @@ -275,6 +275,12 @@ func TestParseConfig(t *testing.T) {
if !opts.Clustering.AllowAddRemoveNode {
t.Fatal("Expected AllowAddRemoveNode to be true")
}
if !opts.Clustering.BoltFreeListSync {
t.Fatal("Expected BoltFreeListSync to be true")
}
if !opts.Clustering.BoltFreeListArray {
t.Fatal("Expected BoltFreeListArray to be true")
}
if opts.SQLStoreOpts.Driver != "mysql" {
t.Fatalf("Expected SQL Driver to be %q, got %q", "mysql", opts.SQLStoreOpts.Driver)
}
Expand Down Expand Up @@ -505,6 +511,8 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "cluster:{raft_commit_timeout:123}", wrongTypeErr)
expectFailureFor(t, "cluster:{raft_commit_timeout:\"not_a_time\"}", wrongTimeErr)
expectFailureFor(t, "cluster:{allow_add_remove_node:1}", wrongTypeErr)
expectFailureFor(t, "cluster:{bolt_free_list_sync:123}", wrongTypeErr)
expectFailureFor(t, "cluster:{bolt_free_list_array:123}", wrongTypeErr)
expectFailureFor(t, "sql:{driver:false}", wrongTypeErr)
expectFailureFor(t, "sql:{source:false}", wrongTypeErr)
expectFailureFor(t, "sql:{no_caching:123}", wrongTypeErr)
Expand Down
34 changes: 19 additions & 15 deletions server/raft_log.go
Expand Up @@ -48,8 +48,8 @@ type raftLog struct {
closed bool

// Our cache
cache []*raft.Log
cacheSize uint64 // Save size as uint64 to not have to case during store/load
cache []*raft.Log // Simple array containing sequence based on modulo
cacheSize uint64 // Save size as uint64 to not have to case during store/load

// If the store is using encryption
encryption bool
Expand All @@ -58,16 +58,23 @@ type raftLog struct {
encryptOffset int
}

func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bool, encryptionCipher string, encryptionKey []byte) (*raftLog, error) {
func newRaftLog(log logger.Logger, fileName string, opts *Options) (*raftLog, error) {
if opts == nil {
opts = GetDefaultOptions()
}
r := &raftLog{
log: log,
fileName: fileName,
codec: &codec.MsgpackHandle{},
}
freeListType := bolt.FreelistMapType
if opts.Clustering.BoltFreeListArray {
freeListType = bolt.FreelistArrayType
}
dbOpts := &bolt.Options{
NoSync: !sync,
NoFreelistSync: true,
FreelistType: bolt.FreelistMapType,
NoSync: !opts.Clustering.Sync,
NoFreelistSync: !opts.Clustering.BoltFreeListSync,
FreelistType: freeListType,
}
db, err := bolt.Open(fileName, 0600, dbOpts)
if err != nil {
Expand All @@ -78,13 +85,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo
r.conn.Close()
return nil, err
}
if encrypt {
if opts.Encrypt {
lastIndex, err := r.getIndex(false)
if err != nil {
r.conn.Close()
return nil, err
}
eds, err := stores.NewEDStore(encryptionCipher, encryptionKey, lastIndex)
eds, err := stores.NewEDStore(opts.EncryptionCipher, opts.EncryptionKey, lastIndex)
if err != nil {
r.conn.Close()
return nil, err
Expand All @@ -94,16 +101,13 @@ func newRaftLog(log logger.Logger, fileName string, sync bool, _ int, encrypt bo
r.encryptBuf = make([]byte, 100)
r.encryptOffset = eds.EncryptionOffset()
}
if cacheSize := opts.Clustering.LogCacheSize; cacheSize > 0 {
r.cacheSize = uint64(cacheSize)
r.cache = make([]*raft.Log, cacheSize)
}
return r, nil
}

func (r *raftLog) setCacheSize(cacheSize int) {
r.Lock()
defer r.Unlock()
r.cacheSize = uint64(cacheSize)
r.cache = make([]*raft.Log, cacheSize)
}

func (r *raftLog) init() error {
tx, err := r.conn.Begin(true)
if err != nil {
Expand Down
61 changes: 41 additions & 20 deletions server/raft_log_test.go
Expand Up @@ -27,12 +27,12 @@ import (
"github.com/nats-io/nats-streaming-server/stores"
)

func createTestRaftLog(t tLogger, sync bool, trailingLogs int) *raftLog {
func createTestRaftLog(t tLogger, opts *Options) *raftLog {
if err := os.MkdirAll(defaultRaftLog, os.ModeDir+os.ModePerm); err != nil {
stackFatalf(t, "Unable to create raft log directory: %v", err)
}
fileName := filepath.Join(defaultRaftLog, raftLogFile)
store, err := newRaftLog(testLogger, fileName, sync, trailingLogs, false, stores.CryptoCipherAutoSelect, nil)
store, err := newRaftLog(testLogger, fileName, opts)
if err != nil {
stackFatalf(t, "Error creating store: %v", err)
}
Expand All @@ -44,7 +44,7 @@ func TestRaftLogDeleteRange(t *testing.T) {
defer cleanupRaftLog(t)

// No sync (will check that conn's NoSync value is correct after a recreating the file)
store := createTestRaftLog(t, false, 0)
store := createTestRaftLog(t, nil)
defer store.Close()

// Store in dbConf bucket
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestRaftLogEncodeDecodeLogs(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
store := createTestRaftLog(t, nil)
defer store.Close()

total := 50000
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestRaftLogWithEncryption(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
store := createTestRaftLog(t, nil)
defer store.Close()
// Plain text log
store.StoreLog(&raft.Log{Index: 1, Term: 1, Type: raft.LogCommand, Data: []byte("abcd")})
Expand All @@ -248,7 +248,11 @@ func TestRaftLogWithEncryption(t *testing.T) {
store.Close()

// Re-open as encrypted store
store, err := newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("testkey"))
opts := GetDefaultOptions()
opts.Encrypt = true
opts.EncryptionCipher = stores.CryptoCipherAES
opts.EncryptionKey = []byte("testkey")
store, err := newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error opening store: %v", err)
}
Expand All @@ -275,7 +279,8 @@ func TestRaftLogWithEncryption(t *testing.T) {
fileName = filepath.Join(defaultRaftLog, raftLogFile)

key := []byte("testkey")
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, key)
opts.EncryptionKey = key
store, err = newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand Down Expand Up @@ -325,7 +330,9 @@ func TestRaftLogWithEncryption(t *testing.T) {
if err := os.Setenv(stores.CryptoStoreEnvKeyName, "testkey"); err != nil {
t.Fatalf("Unable to set environment variable: %v", err)
}
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, nil)
opts.EncryptionCipher = stores.CryptoCipherAES
opts.EncryptionKey = nil
store, err = newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -341,7 +348,8 @@ func TestRaftLogWithEncryption(t *testing.T) {

// Ensure that env key override config by providing a wrong key
// and notice that we have correct decrypt.
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("wrongkey"))
opts.EncryptionKey = []byte("wrongkey")
store, err = newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -357,7 +365,9 @@ func TestRaftLogWithEncryption(t *testing.T) {

// Now unset env variable and re-open with wrong key
os.Unsetenv(stores.CryptoStoreEnvKeyName)
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, []byte("wrongkey"))
opts.EncryptionCipher = stores.CryptoCipherAES
opts.EncryptionKey = []byte("wrongkey")
store, err = newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -369,7 +379,8 @@ func TestRaftLogWithEncryption(t *testing.T) {
store.Close()

// Re-open with encryption but no key, this should fail.
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAES, nil)
opts.EncryptionKey = nil
store, err = newRaftLog(testLogger, fileName, opts)
if err == nil || !strings.Contains(err.Error(), stores.ErrCryptoStoreRequiresKey.Error()) {
if store != nil {
store.Close()
Expand All @@ -387,7 +398,9 @@ func TestRaftLogMultipleCiphers(t *testing.T) {
}
fileName := filepath.Join(defaultRaftLog, raftLogFile)

store, err := newRaftLog(testLogger, fileName, false, 0, false, stores.CryptoCipherAutoSelect, nil)
opts := GetDefaultOptions()
opts.EncryptionCipher = stores.CryptoCipherAutoSelect
store, err := newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -406,7 +419,11 @@ func TestRaftLogMultipleCiphers(t *testing.T) {

storeWithEncryption := func(t *testing.T, encryptionCipher string, payloadIdx int) {
t.Helper()
store, err := newRaftLog(testLogger, fileName, false, 0, true, encryptionCipher, []byte("mykey"))
opts := GetDefaultOptions()
opts.Encrypt = true
opts.EncryptionCipher = encryptionCipher
opts.EncryptionKey = []byte("mykey")
store, err := newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -422,7 +439,11 @@ func TestRaftLogMultipleCiphers(t *testing.T) {

// Now re-open with any cipher, use the auto-select one.
// We should be able to get all 3 messages correctly.
store, err = newRaftLog(testLogger, fileName, false, 0, true, stores.CryptoCipherAutoSelect, []byte("mykey"))
opts = GetDefaultOptions()
opts.Encrypt = true
opts.EncryptionCipher = stores.CryptoCipherAutoSelect
opts.EncryptionKey = []byte("mykey")
store, err = newRaftLog(testLogger, fileName, opts)
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
Expand All @@ -442,7 +463,7 @@ func TestRaftLogChannelID(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
store := createTestRaftLog(t, nil)
defer store.Close()

storeID := func(name string, id uint64) {
Expand Down Expand Up @@ -488,7 +509,7 @@ func TestRaftLogChannelID(t *testing.T) {
deleteID("baz")

store.Close()
store = createTestRaftLog(t, false, 0)
store = createTestRaftLog(t, nil)
defer store.Close()

// Make sure that last ID is returned
Expand All @@ -504,7 +525,7 @@ func TestRaftLogChannelID(t *testing.T) {
deleteID("bar")

store.Close()
store = createTestRaftLog(t, false, 0)
store = createTestRaftLog(t, nil)
defer store.Close()

// No ID returned
Expand All @@ -517,11 +538,11 @@ func TestRaftLogCache(t *testing.T) {
cleanupRaftLog(t)
defer cleanupRaftLog(t)

store := createTestRaftLog(t, false, 0)
opts := GetDefaultOptions()
opts.Clustering.LogCacheSize = 10
store := createTestRaftLog(t, opts)
defer store.Close()

store.setCacheSize(10)

store.RLock()
lc := len(store.cache)
cs := store.cacheSize
Expand Down
2 changes: 2 additions & 0 deletions test/configs/test_parse.conf
Expand Up @@ -92,6 +92,8 @@ streaming: {
raft_lease_timeout: "500ms"
raft_commit_timeout: "50ms"
allow_add_remove_node: true
bolt_free_list_sync: true
bolt_free_list_array: true
}

sql: {
Expand Down

0 comments on commit 73f2531

Please sign in to comment.