Skip to content

Commit

Permalink
feat: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed May 31, 2024
1 parent 9a3e905 commit 0cdbeca
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 298 deletions.
59 changes: 0 additions & 59 deletions raft/internal/tan/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import (

"github.com/cockroachdb/errors/oserror"

"github.com/jamf/regatta/raft/config"
pb "github.com/jamf/regatta/raft/raftpb"
"github.com/lni/goutils/leaktest"
"github.com/lni/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -96,63 +94,6 @@ func TestRemoveEntries(t *testing.T) {
runTanTest(t, opts, tf, fs)
}

func TestRemovedEntriesMultiplexedLogSetup(t *testing.T) {
defer leaktest.AfterTest(t)()
cfg := config.NodeHostConfig{
Expert: config.ExpertConfig{FS: vfs.NewMem()},
}
require.NoError(t, cfg.Prepare())
dirs := []string{"db-dir"}
ldb, err := CreateLogMultiplexedTan(cfg, nil, dirs, []string{})
require.NoError(t, err)
defer ldb.Close()
for i := uint64(0); i < 16; i++ {
updates := []pb.Update{
{
ShardID: 1,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(100), Term: 10},
State: pb.State{Commit: i * uint64(100), Term: 10},
EntriesToSave: []pb.Entry{
{Index: i*2 + 1, Term: 10},
{Index: i*2 + 2, Term: 10},
},
},
{
ShardID: 17,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(200), Term: 20},
State: pb.State{Commit: i * uint64(200), Term: 20},
EntriesToSave: []pb.Entry{
{Index: i*3 + 1, Term: 20},
{Index: i*3 + 2, Term: 20},
{Index: i*3 + 3, Term: 20},
},
},
}
require.NoError(t, ldb.SaveRaftState(updates, 1))
db, err := ldb.collection.getDB(1, 1)
require.NoError(t, err)
// switchToNewLog() should only be called when db.mu is locked
db.mu.Lock()
require.NoError(t, db.switchToNewLog())
db.mu.Unlock()
}
db, err := ldb.collection.getDB(1, 1)
require.NoError(t, err)
current := db.mu.versions.currentVersion()
fileCount := len(current.files)
// not suppose tp have any log file removed
require.NoError(t, ldb.RemoveEntriesTo(1, 1, 32))
current = db.mu.versions.currentVersion()
require.Equal(t, fileCount, len(current.files))
// this should trigger log compaction
require.NoError(t, ldb.RemoveEntriesTo(17, 1, 48))
current = db.mu.versions.currentVersion()
// a log file and an empty log file just created by the last switchToNewLog
require.Equal(t, 2, len(current.files))
}

func TestRemoveAll(t *testing.T) {
fs := vfs.NewMem()
opts := &Options{
Expand Down
71 changes: 2 additions & 69 deletions raft/internal/tan/db_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (

// dbKeeper keeps all tan db instances managed by a tan LogDB.
type dbKeeper interface {
multiplexedLog() bool
name(shardID uint64, replicaID uint64) string
key(shardID uint64) uint64
get(shardID uint64, replicaID uint64) (*db, bool)
set(shardID uint64, replicaID uint64, db *db)
iterate(f func(*db) error) error
Expand All @@ -47,18 +45,10 @@ func newRegularDBKeeper() *regularKeeper {
}
}

func (k *regularKeeper) multiplexedLog() bool {
return false
}

func (k *regularKeeper) name(shardID uint64, replicaID uint64) string {
return fmt.Sprintf("node-%d-%d", shardID, replicaID)
}

func (k *regularKeeper) key(shardID uint64) uint64 {
panic("not suppose to be called")
}

func (k *regularKeeper) get(shardID uint64, replicaID uint64) (*db, bool) {
ni := raftio.NodeInfo{ShardID: shardID, ReplicaID: replicaID}
v, ok := k.dbs[ni]
Expand All @@ -79,78 +69,21 @@ func (k *regularKeeper) iterate(f func(*db) error) error {
return nil
}

var _ dbKeeper = (*multiplexedKeeper)(nil)

// multiplexedKeeper divide all raft nodes into groups and assign nodes within
// the same group to a unique tan db instance. Each raft node is assigned to
// such a group by a so called key value.
type multiplexedKeeper struct {
dbs map[uint64]*db
}

func newMultiplexedDBKeeper() *multiplexedKeeper {
return &multiplexedKeeper{dbs: make(map[uint64]*db)}
}

func (k *multiplexedKeeper) multiplexedLog() bool {
return true
}

func (k *multiplexedKeeper) name(shardID uint64, replicaID uint64) string {
return fmt.Sprintf("shard-%d", k.key(shardID))
}

func (k *multiplexedKeeper) key(shardID uint64) uint64 {
return shardID % 16
}

func (k *multiplexedKeeper) get(shardID uint64, replicaID uint64) (*db, bool) {
v, ok := k.dbs[k.key(shardID)]
return v, ok
}

func (k *multiplexedKeeper) set(shardID uint64, replicaID uint64, db *db) {
k.dbs[k.key(shardID)] = db
}

func (k *multiplexedKeeper) iterate(f func(*db) error) error {
for _, db := range k.dbs {
if err := f(db); err != nil {
return err
}
}
return nil
}

// collection owns a collection of tan db instances.
type collection struct {
fs vfs.FS
dirname string
keeper dbKeeper
}

func newCollection(dirname string, fs vfs.FS, regular bool) collection {
var k dbKeeper
if regular {
k = newRegularDBKeeper()
} else {
k = newMultiplexedDBKeeper()
}
func newCollection(dirname string, fs vfs.FS) collection {
return collection{
fs: fs,
dirname: dirname,
keeper: k,
keeper: newRegularDBKeeper(),
}
}

func (c *collection) multiplexedLog() bool {
return c.keeper.multiplexedLog()
}

func (c *collection) key(shardID uint64) uint64 {
return c.keeper.key(shardID)
}

func (c *collection) getDB(shardID uint64, replicaID uint64) (*db, error) {
db, ok := c.keeper.get(shardID, replicaID)
if ok {
Expand Down
72 changes: 3 additions & 69 deletions raft/internal/tan/logdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ func (factory) Create(cfg config.NodeHostConfig,
return CreateTan(cfg, cb, dirs, wals)
}

// MultiplexedLogFactory is a LogDB factory instance used for creating an
// tan DB with multiplexed logs.
var MultiplexedLogFactory = multiplexLogFactory{}

type multiplexLogFactory struct{}

// Create creates a tan instance that uses multiplexed log files.
func (multiplexLogFactory) Create(cfg config.NodeHostConfig,
cb config.LogDBCallback, dirs []string, wals []string) (raftio.ILogDB, error) {
return CreateLogMultiplexedTan(cfg, cb, dirs, wals)
}

// Name returns the name of the tan instance.
func (factory) Name() string {
return tanLogDBName
Expand All @@ -104,21 +92,11 @@ type LogDB struct {
// be backed by a dedicated log file.
func CreateTan(cfg config.NodeHostConfig, cb config.LogDBCallback,
dirs []string, wals []string) (*LogDB, error) {
return createTan(cfg, cb, dirs, wals, true)
}

// CreateLogMultiplexedTan creates and returns a tan instance that uses
// multiplexed log files. A multiplexed log allow multiple raft shards to
// share the same underlying physical log file, this is required when you
// want to run thousands of raft nodes on the same server without having
// thousands action log files.
func CreateLogMultiplexedTan(cfg config.NodeHostConfig, cb config.LogDBCallback,
dirs []string, wals []string) (*LogDB, error) {
return createTan(cfg, cb, dirs, wals, false)
return createTan(cfg, cb, dirs, wals)
}

func createTan(cfg config.NodeHostConfig, cb config.LogDBCallback,
dirs []string, wals []string, singleNodeLog bool) (*LogDB, error) {
dirs []string, wals []string) (*LogDB, error) {
if cfg.Expert.FS == nil {
panic("fs not set")
}
Expand All @@ -131,7 +109,7 @@ func createTan(cfg config.NodeHostConfig, cb config.LogDBCallback,
fs: cfg.Expert.FS,
buffers: make([][]byte, defaultShards),
wgs: make([]*sync.WaitGroup, defaultShards),
collection: newCollection(dirname, cfg.Expert.FS, singleNodeLog),
collection: newCollection(dirname, cfg.Expert.FS),
}
for i := 0; i < len(ldb.buffers); i++ {
ldb.buffers[i] = make([]byte, cfg.Expert.LogDB.KVWriteBufferSize)
Expand Down Expand Up @@ -256,53 +234,9 @@ func (l *LogDB) GetBootstrapInfo(shardID uint64,
// SaveRaftState atomically saves the Raft states, log entries and snapshots
// metadata found in the pb.Update list to the log DB.
func (l *LogDB) SaveRaftState(updates []pb.Update, shardID uint64) error {
if l.collection.multiplexedLog() {
return l.concurrentSaveState(updates, shardID)
}
return l.sequentialSaveState(updates, shardID)
}

func (l *LogDB) concurrentSaveState(updates []pb.Update, shardID uint64) error {
var buf []byte
if shardID-1 < uint64(len(l.buffers)) {
buf = l.buffers[shardID-1]
} else {
buf = make([]byte, defaultBufferSize)
}
syncLog := false
var selected *db
var usedShardID uint64
for idx, ud := range updates {
if idx == 0 {
usedShardID = l.collection.key(ud.ShardID)
} else {
if usedShardID != l.collection.key(ud.ShardID) {
panic("shard ID changed")
}
}
db, err := l.getDB(ud.ShardID, ud.ReplicaID)
if err != nil {
return err
}
if selected == nil {
selected = db
}
sync, err := db.write(ud, buf)
if err != nil {
return err
}
if sync {
syncLog = true
}
}
if syncLog && selected != nil {
if err := selected.sync(); err != nil {
return err
}
}
return nil
}

func (l *LogDB) sequentialSaveState(updates []pb.Update, shardID uint64) error {
var wg *sync.WaitGroup
var buf []byte
Expand Down
89 changes: 0 additions & 89 deletions raft/internal/tan/logdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,92 +191,3 @@ func TestSaveRaftState(t *testing.T) {
require.Equal(t, updates[0].State, rs.State)
require.NoError(t, ldb.Close())
}

func TestConcurrentSaveRaftState(t *testing.T) {
defer leaktest.AfterTest(t)()
cfg := config.NodeHostConfig{
Expert: config.ExpertConfig{FS: vfs.NewMem()},
}
require.NoError(t, cfg.Prepare())
dirs := []string{"db-dir"}
ldb, err := CreateLogMultiplexedTan(cfg, nil, dirs, []string{})
require.NoError(t, err)
defer ldb.Close()
for i := uint64(0); i < 16; i++ {
updates := []pb.Update{
{
ShardID: 1,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(100), Term: 10},
State: pb.State{Commit: i * uint64(100), Term: 10},
EntriesToSave: []pb.Entry{
{Index: i*2 + 1, Term: 10},
{Index: i*2 + 2, Term: 10},
},
},
{
ShardID: 17,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(200), Term: 20},
State: pb.State{Commit: i * uint64(200), Term: 20},
EntriesToSave: []pb.Entry{
{Index: i*3 + 1, Term: 20},
{Index: i*3 + 2, Term: 20},
{Index: i*3 + 3, Term: 20},
},
},
}
require.NoError(t, ldb.SaveRaftState(updates, 1))
}
for i := uint64(0); i < 16; i++ {
updates := []pb.Update{
{
ShardID: 2,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(100), Term: 30},
State: pb.State{Commit: i * uint64(100), Term: 30},
EntriesToSave: []pb.Entry{
{Index: i*2 + 1, Term: 30},
{Index: i*2 + 2, Term: 30},
},
},
{
ShardID: 18,
ReplicaID: 1,
Snapshot: pb.Snapshot{Index: i * uint64(100), Term: 40},
State: pb.State{Commit: i * uint64(100), Term: 40},
EntriesToSave: []pb.Entry{
{Index: i * 3, Term: 40},
{Index: i*3 + 1, Term: 40},
{Index: i*3 + 2, Term: 40},
},
},
}
require.NoError(t, ldb.SaveRaftState(updates, 2))
}
// TODO: add checks to see whether there are shard directories named as
// shard-1 and shard-2
var entries []pb.Entry
results, _, err := ldb.IterateEntries(entries, 0, 1, 1, 1, 33, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, 32, len(results))
var entries2 []pb.Entry
results, _, err = ldb.IterateEntries(entries2, 0, 17, 1, 1, 49, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, 48, len(results))
var entries3 []pb.Entry
results, _, err = ldb.IterateEntries(entries3, 0, 2, 1, 1, 33, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, 32, len(results))

ss1, err := ldb.GetSnapshot(1, 1)
require.NoError(t, err)
require.Equal(t, uint64(1500), ss1.Index)
ss2, err := ldb.GetSnapshot(17, 1)
require.NoError(t, err)
require.Equal(t, uint64(3000), ss2.Index)
ss3, err := ldb.GetSnapshot(2, 1)
require.NoError(t, err)
require.Equal(t, uint64(30), ss3.Term)
require.Equal(t, uint64(1500), ss3.Index)
}
4 changes: 3 additions & 1 deletion raft/internal/tan/read_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package tan

import "sync/atomic"
import (
"sync/atomic"
)

// readState encapsulates the state needed for reading (the current version and
// list of memtables). Loading the readState is done without grabbing
Expand Down
Loading

0 comments on commit 0cdbeca

Please sign in to comment.