Skip to content

Commit

Permalink
[IMPROVED] Reduce use of locking in raftlog.go
Browse files Browse the repository at this point in the history
raftLog is a wrapper to bolt's DB database. I think that we
can remove some of the raftLog's locking since bolt's DB itself
will ensure locking.

I am not expecting much of a performance boost with this PR, but
it has been observed contention between GetLog and StoreLogs in
some cases, so that may help.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 18, 2021
1 parent 73f2531 commit 09c7909
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
44 changes: 14 additions & 30 deletions server/raft_log.go
@@ -1,4 +1,4 @@
// Copyright 2018-2020 The NATS Authors
// Copyright 2018-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -48,6 +48,7 @@ type raftLog struct {
closed bool

// Our cache
hasCache bool // Immutable
cache []*raft.Log // Simple array containing sequence based on modulo
cacheSize uint64 // Save size as uint64 to not have to case during store/load

Expand Down Expand Up @@ -102,6 +103,7 @@ func newRaftLog(log logger.Logger, fileName string, opts *Options) (*raftLog, er
r.encryptOffset = eds.EncryptionOffset()
}
if cacheSize := opts.Clustering.LogCacheSize; cacheSize > 0 {
r.hasCache = true
r.cacheSize = uint64(cacheSize)
r.cache = make([]*raft.Log, cacheSize)
}
Expand Down Expand Up @@ -189,18 +191,12 @@ func (r *raftLog) Close() error {

// FirstIndex implements the LogStore interface
func (r *raftLog) FirstIndex() (uint64, error) {
r.RLock()
idx, err := r.getIndex(true)
r.RUnlock()
return idx, err
return r.getIndex(true)
}

// LastIndex implements the LogStore interface
func (r *raftLog) LastIndex() (uint64, error) {
r.RLock()
idx, err := r.getIndex(false)
r.RUnlock()
return idx, err
return r.getIndex(false)
}

// returns either the first (if first is true) or the last
Expand Down Expand Up @@ -229,18 +225,17 @@ func (r *raftLog) getIndex(first bool) (uint64, error) {

// GetLog implements the LogStore interface
func (r *raftLog) GetLog(idx uint64, log *raft.Log) error {
r.RLock()
if r.cache != nil {
if r.hasCache {
r.RLock()
cached := r.cache[idx%r.cacheSize]
r.RUnlock()
if cached != nil && cached.Index == idx {
*log = *cached
r.RUnlock()
return nil
}
}
tx, err := r.conn.Begin(false)
if err != nil {
r.RUnlock()
return err
}
var key [8]byte
Expand All @@ -253,7 +248,6 @@ func (r *raftLog) GetLog(idx uint64, log *raft.Log) error {
err = r.decodeRaftLog(val, log)
}
tx.Rollback()
r.RUnlock()
return err
}

Expand All @@ -264,10 +258,8 @@ func (r *raftLog) StoreLog(log *raft.Log) error {

// StoreLogs implements the LogStore interface
func (r *raftLog) StoreLogs(logs []*raft.Log) error {
r.Lock()
tx, err := r.conn.Begin(true)
if err != nil {
r.Unlock()
return err
}
bucket := tx.Bucket(logsBucket)
Expand All @@ -290,25 +282,26 @@ func (r *raftLog) StoreLogs(logs []*raft.Log) error {
tx.Rollback()
} else {
err = tx.Commit()
if err == nil && r.cache != nil {
if err == nil && r.hasCache {
r.Lock()
// Cache only on success
for _, l := range logs {
r.cache[l.Index%r.cacheSize] = l
}
r.Unlock()
}
}
r.Unlock()
return err
}

// DeleteRange implements the LogStore interface
func (r *raftLog) DeleteRange(min, max uint64) (retErr error) {
r.Lock()
defer r.Unlock()

if r.cacheSize > 0 {
if r.hasCache {
r.Lock()
// Reset cache
r.cache = make([]*raft.Log, int(r.cacheSize))
r.Unlock()
}

start := time.Now()
Expand Down Expand Up @@ -354,10 +347,8 @@ func (r *raftLog) Set(k, v []byte) error {
}

func (r *raftLog) set(bucketName, k, v []byte) error {
r.Lock()
tx, err := r.conn.Begin(true)
if err != nil {
r.Unlock()
return err
}
bucket := tx.Bucket(bucketName)
Expand All @@ -367,7 +358,6 @@ func (r *raftLog) set(bucketName, k, v []byte) error {
} else {
err = tx.Commit()
}
r.Unlock()
return err
}

Expand All @@ -377,10 +367,8 @@ func (r *raftLog) Get(k []byte) ([]byte, error) {
}

func (r *raftLog) get(bucketName, k []byte) ([]byte, error) {
r.RLock()
tx, err := r.conn.Begin(false)
if err != nil {
r.RUnlock()
return nil, err
}
var v []byte
Expand All @@ -393,7 +381,6 @@ func (r *raftLog) get(bucketName, k []byte) ([]byte, error) {
v = append([]byte(nil), val...)
}
tx.Rollback()
r.RUnlock()
return v, err
}

Expand Down Expand Up @@ -428,10 +415,8 @@ func (r *raftLog) SetChannelID(name string, id uint64) error {
}

func (r *raftLog) DeleteChannelID(name string) error {
r.Lock()
tx, err := r.conn.Begin(true)
if err != nil {
r.Unlock()
return err
}
bucket := tx.Bucket(chanBucket)
Expand All @@ -441,7 +426,6 @@ func (r *raftLog) DeleteChannelID(name string) error {
} else {
err = tx.Commit()
}
r.Unlock()
return err
}

Expand Down
8 changes: 7 additions & 1 deletion stores/cryptostore.go
@@ -1,4 +1,4 @@
// Copyright 2018-2019 The NATS Authors
// Copyright 2018-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -84,6 +84,7 @@ type EDStore struct {
aesgcm cipher.AEAD // This is to decrypt data encrypted with this AES cipher
chachagcm cipher.AEAD // This is to decrypt data encrypted with this Chacha cipher
cryptoOverhead int
nonceMu sync.Mutex
nonce []byte
nonceSize int
}
Expand Down Expand Up @@ -176,8 +177,10 @@ func newEDStore(cipherCode byte, keyHash []byte, idx uint64) (*EDStore, error) {
s.gcm = s.chachagcm
}

s.nonceMu.Lock()
s.nonce = make([]byte, s.nonceSize)
if _, err := io.ReadFull(rand.Reader, s.nonce); err != nil {
s.nonceMu.Unlock()
return nil, err
}
idx++
Expand All @@ -188,6 +191,7 @@ func newEDStore(cipherCode byte, keyHash []byte, idx uint64) (*EDStore, error) {
pos++
b -= 8
}
s.nonceMu.Unlock()
return s, nil
}

Expand All @@ -211,6 +215,7 @@ func (s *EDStore) Encrypt(pbuf *[]byte, data []byte) ([]byte, error) {
*pbuf = buf
}
buf[0] = s.code
s.nonceMu.Lock()
copy(buf[1:], s.nonce)
copy(buf[1+s.nonceSize:], data)
dst := buf[1+s.nonceSize : 1+s.nonceSize+len(data)]
Expand All @@ -221,6 +226,7 @@ func (s *EDStore) Encrypt(pbuf *[]byte, data []byte) ([]byte, error) {
break
}
}
s.nonceMu.Unlock()
return buf[:1+s.nonceSize+len(ed)], nil
}

Expand Down

0 comments on commit 09c7909

Please sign in to comment.