Skip to content

Commit

Permalink
feature: follower writes - applied index notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 22, 2024
1 parent acad351 commit c76e20b
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 152 deletions.
6 changes: 1 addition & 5 deletions cmd/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,7 @@ func follower(_ *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("failed to create API server: %w", err)
}
fkv := regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue)
defer func() {
_ = conn.Close()
}()
regattapb.RegisterKVServer(regatta, fkv)
regattapb.RegisterKVServer(regatta, regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue))
regattapb.RegisterClusterServer(regatta, &regattaserver.ClusterServer{
Cluster: engine,
Config: viperConfigReader,
Expand Down
15 changes: 8 additions & 7 deletions storage/logreader/logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,15 @@ func (l *Simple) QueryRaftLog(ctx context.Context, clusterID uint64, logRange dr
}

type ShardCache struct {
shardCache util.SyncMap[uint64, *shard]
ShardCacheSize int
shardCache *util.SyncMap[uint64, *shard]
}

func (l *ShardCache) NodeDeleted(shardID uint64) {
l.shardCache.Delete(shardID)
}

func (l *ShardCache) NodeReady(shardID uint64) {
l.shardCache.ComputeIfAbsent(shardID, func(shardId uint64) *shard { return &shard{cache: newCache(l.ShardCacheSize)} })
}

func (l *ShardCache) LogCompacted(shardID uint64) {
l.shardCache.Store(shardID, &shard{cache: newCache(l.ShardCacheSize)})
l.shardCache.Delete(shardID)
}

type Cached struct {
Expand Down Expand Up @@ -156,3 +151,9 @@ func fixSize(entries []raftpb.Entry, maxSize uint64) []raftpb.Entry {
}
return entries
}

func NewShardCache(size int) *ShardCache {
return &ShardCache{shardCache: util.NewSyncMap(func(k uint64) *shard {
return &shard{cache: newCache(size)}
})}
}
91 changes: 9 additions & 82 deletions storage/logreader/logreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

serror "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/util"
"github.com/jamf/regatta/util/iter"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/raftio"
"github.com/lni/dragonboat/v4/raftpb"
Expand Down Expand Up @@ -69,32 +70,30 @@ func TestCached_NodeDeleted(t *testing.T) {
}{
{
name: "remove existing cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove non-existent cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove existent cache shard from list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c := NewShardCache(100)
for i := 1; i <= 4; i++ {
c.shardCache.Store(uint64(i), &shard{})
}
Expand All @@ -105,86 +104,15 @@ func TestCached_NodeDeleted(t *testing.T) {
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(2)), "unexpected cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeDeleted(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
})
}
}

func TestCached_NodeReady(t *testing.T) {
type fields struct {
ShardCache *ShardCache
}
type args struct {
info raftio.NodeInfo
}
tests := []struct {
name string
args args
fields fields
assert func(*testing.T, *util.SyncMap[uint64, *shard])
}{
{
name: "add ready node",
fields: fields{ShardCache: &ShardCache{}},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add existing node",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add ready node to list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
c.shardCache.Store(uint64(3), &shard{})
c.shardCache.Store(uint64(5), &shard{})
c.shardCache.Store(uint64(6), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 2,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.True(t, ok, "missing cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeReady(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
tt.assert(t, l.shardCache)
})
}
}
Expand Down Expand Up @@ -483,10 +411,9 @@ func TestCached_QueryRaftLog(t *testing.T) {
tt.on(querier)
}
l := &Cached{
ShardCache: &ShardCache{ShardCacheSize: tt.fields.ShardCacheSize},
ShardCache: NewShardCache(tt.fields.ShardCacheSize),
LogQuerier: querier,
}
l.shardCache.ComputeIfAbsent(tt.args.clusterID, func(uint642 uint64) *shard { return &shard{cache: newCache(tt.fields.ShardCacheSize)} })
if len(tt.cacheContent) > 0 {
v, _ := l.shardCache.
Load(tt.args.clusterID)
Expand Down
3 changes: 2 additions & 1 deletion storage/table/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type TableConfig struct {
// TableCacheSize shared table cache size, the cache is used to hold handles to open SSTs.
TableCacheSize int
// RecoveryType the in-cluster snapshot recovery type.
RecoveryType SnapshotRecoveryType
RecoveryType SnapshotRecoveryType
AppliedIndexListener func(table string, rev uint64)
}

type MetaConfig struct {
Expand Down
17 changes: 16 additions & 1 deletion storage/table/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ func (s *snapshotHeader) snapshotType() SnapshotRecoveryType {
return SnapshotRecoveryType(s[6])
}

func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache, tableCache *pebble.TableCache, srt SnapshotRecoveryType) sm.CreateOnDiskStateMachineFunc {
func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache, tableCache *pebble.TableCache, srt SnapshotRecoveryType, af func(applied uint64)) sm.CreateOnDiskStateMachineFunc {
if fs == nil {
fs = vfs.Default
}
if af == nil {
af = func(applied uint64) {}
}
return func(clusterID uint64, nodeID uint64) sm.IOnDiskStateMachine {
hostname, _ := os.Hostname()
dbDirName := rp.GetNodeDBDirName(stateMachineDir, hostname, fmt.Sprintf("%s-%d", tableName, clusterID))
Expand All @@ -105,6 +108,7 @@ func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache,
log: zap.S().Named("table").Named(tableName),
metrics: newMetrics(tableName, clusterID),
recoveryType: srt,
appliedFunc: af,
}
}
}
Expand All @@ -123,6 +127,7 @@ type FSM struct {
tableCache *pebble.TableCache
metrics *metrics
recoveryType SnapshotRecoveryType
appliedFunc func(applied uint64)
}

func (p *FSM) Open(_ <-chan struct{}) (uint64, error) {
Expand Down Expand Up @@ -178,6 +183,11 @@ func (p *FSM) Open(_ <-chan struct{}) (uint64, error) {
return 0, err
}
p.metrics.applied.Store(idx)
p.appliedFunc(idx)
lx, _ := readLocalIndex(db, sysLeaderIndex)
if lx != 0 {
p.appliedFunc(lx)
}
return idx, nil
}

Expand Down Expand Up @@ -301,6 +311,11 @@ func (p *FSM) Update(updates []sm.Entry) ([]sm.Entry, error) {
}

p.metrics.applied.Store(idx)
if ctx.leaderIndex != nil {
p.appliedFunc(*ctx.leaderIndex)
} else {
p.appliedFunc(idx)
}
return updates, nil
}

Expand Down
17 changes: 9 additions & 8 deletions storage/table/fsm/fsm_feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,15 @@ func generateFiles(t *testing.T, version int, inputCommands []*regattapb.Command

func createTestFSM() (*FSM, error) {
fsm := &FSM{
fs: vfs.NewMem(),
clusterID: 1,
nodeID: 1,
tableName: "test",
dirname: "/tmp",
closed: false,
log: zap.NewNop().Sugar(),
metrics: newMetrics("test", 1),
fs: vfs.NewMem(),
clusterID: 1,
nodeID: 1,
tableName: "test",
dirname: "/tmp",
closed: false,
log: zap.NewNop().Sugar(),
metrics: newMetrics("test", 1),
appliedFunc: func(applied uint64) {},
}

db, err := rp.OpenDB(fsm.dirname, rp.WithFS(fsm.fs))
Expand Down
Loading

0 comments on commit c76e20b

Please sign in to comment.