From 09c7909aabe5ad8247de015f6f26133ef9c1c97b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 18 Feb 2021 12:54:22 -0700 Subject: [PATCH] [IMPROVED] Reduce use of locking in raftlog.go raftLog is a wrapper to bolt's DB database. I think that we can remove some of the raftLog's locking since bolt's DB itself will ensure locking. I am not expecting much of a performance boost with this PR, but it has been observed contention between GetLog and StoreLogs in some cases, so that may help. Signed-off-by: Ivan Kozlovic --- server/raft_log.go | 44 ++++++++++++++----------------------------- stores/cryptostore.go | 8 +++++++- 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/server/raft_log.go b/server/raft_log.go index 217905fd..d3d637f3 100644 --- a/server/raft_log.go +++ b/server/raft_log.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -48,6 +48,7 @@ type raftLog struct { closed bool // Our cache + hasCache bool // Immutable cache []*raft.Log // Simple array containing sequence based on modulo cacheSize uint64 // Save size as uint64 to not have to case during store/load @@ -102,6 +103,7 @@ func newRaftLog(log logger.Logger, fileName string, opts *Options) (*raftLog, er r.encryptOffset = eds.EncryptionOffset() } if cacheSize := opts.Clustering.LogCacheSize; cacheSize > 0 { + r.hasCache = true r.cacheSize = uint64(cacheSize) r.cache = make([]*raft.Log, cacheSize) } @@ -189,18 +191,12 @@ func (r *raftLog) Close() error { // FirstIndex implements the LogStore interface func (r *raftLog) FirstIndex() (uint64, error) { - r.RLock() - idx, err := r.getIndex(true) - r.RUnlock() - return idx, err + return r.getIndex(true) } // LastIndex implements the LogStore interface func (r *raftLog) LastIndex() (uint64, error) { - r.RLock() - idx, err := r.getIndex(false) - r.RUnlock() - return idx, err + return r.getIndex(false) } // returns either the first (if first is true) or the last @@ -229,18 +225,17 @@ func (r *raftLog) getIndex(first bool) (uint64, error) { // GetLog implements the LogStore interface func (r *raftLog) GetLog(idx uint64, log *raft.Log) error { - r.RLock() - if r.cache != nil { + if r.hasCache { + r.RLock() cached := r.cache[idx%r.cacheSize] + r.RUnlock() if cached != nil && cached.Index == idx { *log = *cached - r.RUnlock() return nil } } tx, err := r.conn.Begin(false) if err != nil { - r.RUnlock() return err } var key [8]byte @@ -253,7 +248,6 @@ func (r *raftLog) GetLog(idx uint64, log *raft.Log) error { err = r.decodeRaftLog(val, log) } tx.Rollback() - r.RUnlock() return err } @@ -264,10 +258,8 @@ func (r *raftLog) StoreLog(log *raft.Log) error { // StoreLogs implements the LogStore interface func (r *raftLog) StoreLogs(logs []*raft.Log) error { - r.Lock() tx, err := r.conn.Begin(true) if err != nil { - r.Unlock() return err } bucket := tx.Bucket(logsBucket) @@ -290,25 +282,26 @@ func (r *raftLog) StoreLogs(logs []*raft.Log) error { tx.Rollback() } else { err = tx.Commit() - if err == nil && r.cache != nil { + if err == nil && r.hasCache { + r.Lock() // Cache only on success for _, l := range logs { r.cache[l.Index%r.cacheSize] = l } + r.Unlock() } } - r.Unlock() return err } // DeleteRange implements the LogStore interface func (r *raftLog) DeleteRange(min, max uint64) (retErr error) { - r.Lock() - defer r.Unlock() - if r.cacheSize > 0 { + if r.hasCache { + r.Lock() // Reset cache r.cache = make([]*raft.Log, int(r.cacheSize)) + r.Unlock() } start := time.Now() @@ -354,10 +347,8 @@ func (r *raftLog) Set(k, v []byte) error { } func (r *raftLog) set(bucketName, k, v []byte) error { - r.Lock() tx, err := r.conn.Begin(true) if err != nil { - r.Unlock() return err } bucket := tx.Bucket(bucketName) @@ -367,7 +358,6 @@ func (r *raftLog) set(bucketName, k, v []byte) error { } else { err = tx.Commit() } - r.Unlock() return err } @@ -377,10 +367,8 @@ func (r *raftLog) Get(k []byte) ([]byte, error) { } func (r *raftLog) get(bucketName, k []byte) ([]byte, error) { - r.RLock() tx, err := r.conn.Begin(false) if err != nil { - r.RUnlock() return nil, err } var v []byte @@ -393,7 +381,6 @@ func (r *raftLog) get(bucketName, k []byte) ([]byte, error) { v = append([]byte(nil), val...) } tx.Rollback() - r.RUnlock() return v, err } @@ -428,10 +415,8 @@ func (r *raftLog) SetChannelID(name string, id uint64) error { } func (r *raftLog) DeleteChannelID(name string) error { - r.Lock() tx, err := r.conn.Begin(true) if err != nil { - r.Unlock() return err } bucket := tx.Bucket(chanBucket) @@ -441,7 +426,6 @@ func (r *raftLog) DeleteChannelID(name string) error { } else { err = tx.Commit() } - r.Unlock() return err } diff --git a/stores/cryptostore.go b/stores/cryptostore.go index a9fd837a..8f52117b 100644 --- a/stores/cryptostore.go +++ b/stores/cryptostore.go @@ -1,4 +1,4 @@ -// Copyright 2018-2019 The NATS Authors +// Copyright 2018-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -84,6 +84,7 @@ type EDStore struct { aesgcm cipher.AEAD // This is to decrypt data encrypted with this AES cipher chachagcm cipher.AEAD // This is to decrypt data encrypted with this Chacha cipher cryptoOverhead int + nonceMu sync.Mutex nonce []byte nonceSize int } @@ -176,8 +177,10 @@ func newEDStore(cipherCode byte, keyHash []byte, idx uint64) (*EDStore, error) { s.gcm = s.chachagcm } + s.nonceMu.Lock() s.nonce = make([]byte, s.nonceSize) if _, err := io.ReadFull(rand.Reader, s.nonce); err != nil { + s.nonceMu.Unlock() return nil, err } idx++ @@ -188,6 +191,7 @@ func newEDStore(cipherCode byte, keyHash []byte, idx uint64) (*EDStore, error) { pos++ b -= 8 } + s.nonceMu.Unlock() return s, nil } @@ -211,6 +215,7 @@ func (s *EDStore) Encrypt(pbuf *[]byte, data []byte) ([]byte, error) { *pbuf = buf } buf[0] = s.code + s.nonceMu.Lock() copy(buf[1:], s.nonce) copy(buf[1+s.nonceSize:], data) dst := buf[1+s.nonceSize : 1+s.nonceSize+len(data)] @@ -221,6 +226,7 @@ func (s *EDStore) Encrypt(pbuf *[]byte, data []byte) ([]byte, error) { break } } + s.nonceMu.Unlock() return buf[:1+s.nonceSize+len(ed)], nil }