Skip to content

Commit

Permalink
feature: follower writes - applied index notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 22, 2024
1 parent 446664f commit 6a32f54
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 162 deletions.
6 changes: 1 addition & 5 deletions cmd/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,7 @@ func follower(_ *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("failed to create API server: %w", err)
}
fkv := regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue)
defer func() {
_ = conn.Close()
}()
regattapb.RegisterKVServer(regatta, fkv)
regattapb.RegisterKVServer(regatta, regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue))
regattapb.RegisterClusterServer(regatta, &regattaserver.ClusterServer{
Cluster: engine,
Config: viperConfigReader,
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cockroachdb/pebble v0.0.0-20221207173255-0f086d933dac
github.com/google/uuid v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/memberlist v0.5.0
Expand Down Expand Up @@ -48,14 +49,13 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.25.0 // indirect
github.com/getsentry/sentry-go v0.26.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
Expand All @@ -64,11 +64,13 @@ require (
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lni/goutils v1.4.0 // indirect
github.com/lyft/protoc-gen-star/v2 v2.0.3 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/miekg/dns v1.1.56 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
Expand All @@ -106,6 +108,8 @@ github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyT
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI=
github.com/getsentry/sentry-go v0.25.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/getsentry/sentry-go v0.26.0 h1:IX3++sF6/4B5JcevhdZfdKIHfyvMmAq/UnqcyT2H6mA=
github.com/getsentry/sentry-go v0.26.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
Expand Down Expand Up @@ -207,6 +211,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
Expand Down Expand Up @@ -257,6 +263,8 @@ github.com/lni/goutils v1.4.0 h1:e1tNN+4zsbTpNvhG5cxirkH9Pdz96QAZ2j6+5tmjvqg=
github.com/lni/goutils v1.4.0/go.mod h1:LIHvF0fflR+zyXUQFQOiHPpKANf3UIr7DFIv5CBPOoU=
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 h1:jX9CoRWNPwrZ2yY3RJFTSwa49qDQqtXglrCByGdQGZg=
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376/go.mod h1:LOatfyR8Xeej1jbXybwYGVfCccR0u+BQRG9xg7BD7xo=
github.com/lyft/protoc-gen-star/v2 v2.0.3 h1:/3+/2sWyXeMLzKd1bX+ixWKgEMsULrIivpDsuaF441o=
github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
Expand Down
8 changes: 1 addition & 7 deletions storage/engine_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ func (e *events) dispatchEvents() {
switch ev := evt.(type) {
case nodeHostShuttingDown:
return
case leaderUpdated, nodeUnloaded, membershipChanged:
e.engine.Cluster.Notify()
case nodeReady:
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.NodeReady(ev.ShardID)
}
case leaderUpdated, nodeUnloaded, membershipChanged, nodeReady:
e.engine.Cluster.Notify()
case nodeDeleted:
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
Expand All @@ -33,7 +28,6 @@ func (e *events) dispatchEvents() {
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.LogCompacted(ev.ShardID)
}
e.engine.Cluster.Notify()
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions storage/logreader/logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,15 @@ func (l *Simple) QueryRaftLog(ctx context.Context, clusterID uint64, logRange dr
}

type ShardCache struct {
shardCache util.SyncMap[uint64, *shard]
ShardCacheSize int
shardCache *util.SyncMap[uint64, *shard]
}

func (l *ShardCache) NodeDeleted(shardID uint64) {
l.shardCache.Delete(shardID)
}

func (l *ShardCache) NodeReady(shardID uint64) {
l.shardCache.ComputeIfAbsent(shardID, func(shardId uint64) *shard { return &shard{cache: newCache(l.ShardCacheSize)} })
}

func (l *ShardCache) LogCompacted(shardID uint64) {
l.shardCache.Store(shardID, &shard{cache: newCache(l.ShardCacheSize)})
l.shardCache.Delete(shardID)
}

type Cached struct {
Expand Down Expand Up @@ -156,3 +151,9 @@ func fixSize(entries []raftpb.Entry, maxSize uint64) []raftpb.Entry {
}
return entries
}

func NewShardCache(size int) *ShardCache {
return &ShardCache{shardCache: util.NewSyncMap(func(k uint64) *shard {
return &shard{cache: newCache(size)}
})}
}
91 changes: 9 additions & 82 deletions storage/logreader/logreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

serror "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/util"
"github.com/jamf/regatta/util/iter"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/raftio"
"github.com/lni/dragonboat/v4/raftpb"
Expand Down Expand Up @@ -69,32 +70,30 @@ func TestCached_NodeDeleted(t *testing.T) {
}{
{
name: "remove existing cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove non-existent cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove existent cache shard from list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c := NewShardCache(100)
for i := 1; i <= 4; i++ {
c.shardCache.Store(uint64(i), &shard{})
}
Expand All @@ -105,86 +104,15 @@ func TestCached_NodeDeleted(t *testing.T) {
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(2)), "unexpected cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeDeleted(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
})
}
}

func TestCached_NodeReady(t *testing.T) {
type fields struct {
ShardCache *ShardCache
}
type args struct {
info raftio.NodeInfo
}
tests := []struct {
name string
args args
fields fields
assert func(*testing.T, *util.SyncMap[uint64, *shard])
}{
{
name: "add ready node",
fields: fields{ShardCache: &ShardCache{}},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add existing node",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add ready node to list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
c.shardCache.Store(uint64(3), &shard{})
c.shardCache.Store(uint64(5), &shard{})
c.shardCache.Store(uint64(6), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 2,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.True(t, ok, "missing cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeReady(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
tt.assert(t, l.shardCache)
})
}
}
Expand Down Expand Up @@ -483,10 +411,9 @@ func TestCached_QueryRaftLog(t *testing.T) {
tt.on(querier)
}
l := &Cached{
ShardCache: &ShardCache{ShardCacheSize: tt.fields.ShardCacheSize},
ShardCache: NewShardCache(tt.fields.ShardCacheSize),
LogQuerier: querier,
}
l.shardCache.ComputeIfAbsent(tt.args.clusterID, func(uint642 uint64) *shard { return &shard{cache: newCache(tt.fields.ShardCacheSize)} })
if len(tt.cacheContent) > 0 {
v, _ := l.shardCache.
Load(tt.args.clusterID)
Expand Down
3 changes: 2 additions & 1 deletion storage/table/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type TableConfig struct {
// TableCacheSize shared table cache size, the cache is used to hold handles to open SSTs.
TableCacheSize int
// RecoveryType the in-cluster snapshot recovery type.
RecoveryType SnapshotRecoveryType
RecoveryType SnapshotRecoveryType
AppliedIndexListener func(table string, rev uint64)
}

type MetaConfig struct {
Expand Down
17 changes: 16 additions & 1 deletion storage/table/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ func (s *snapshotHeader) snapshotType() SnapshotRecoveryType {
return SnapshotRecoveryType(s[6])
}

func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache, tableCache *pebble.TableCache, srt SnapshotRecoveryType) sm.CreateOnDiskStateMachineFunc {
func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache, tableCache *pebble.TableCache, srt SnapshotRecoveryType, af func(applied uint64)) sm.CreateOnDiskStateMachineFunc {
if fs == nil {
fs = vfs.Default
}
if af == nil {
af = func(applied uint64) {}
}
return func(clusterID uint64, nodeID uint64) sm.IOnDiskStateMachine {
hostname, _ := os.Hostname()
dbDirName := rp.GetNodeDBDirName(stateMachineDir, hostname, fmt.Sprintf("%s-%d", tableName, clusterID))
Expand All @@ -105,6 +108,7 @@ func New(tableName, stateMachineDir string, fs vfs.FS, blockCache *pebble.Cache,
log: zap.S().Named("table").Named(tableName),
metrics: newMetrics(tableName, clusterID),
recoveryType: srt,
appliedFunc: af,
}
}
}
Expand All @@ -123,6 +127,7 @@ type FSM struct {
tableCache *pebble.TableCache
metrics *metrics
recoveryType SnapshotRecoveryType
appliedFunc func(applied uint64)
}

func (p *FSM) Open(_ <-chan struct{}) (uint64, error) {
Expand Down Expand Up @@ -178,6 +183,11 @@ func (p *FSM) Open(_ <-chan struct{}) (uint64, error) {
return 0, err
}
p.metrics.applied.Store(idx)
p.appliedFunc(idx)
lx, _ := readLocalIndex(db, sysLeaderIndex)
if lx != 0 {
p.appliedFunc(lx)
}
return idx, nil
}

Expand Down Expand Up @@ -301,6 +311,11 @@ func (p *FSM) Update(updates []sm.Entry) ([]sm.Entry, error) {
}

p.metrics.applied.Store(idx)
if ctx.leaderIndex != nil {
p.appliedFunc(*ctx.leaderIndex)
} else {
p.appliedFunc(idx)
}
return updates, nil
}

Expand Down
17 changes: 9 additions & 8 deletions storage/table/fsm/fsm_feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,15 @@ func generateFiles(t *testing.T, version int, inputCommands []*regattapb.Command

func createTestFSM() (*FSM, error) {
fsm := &FSM{
fs: vfs.NewMem(),
clusterID: 1,
nodeID: 1,
tableName: "test",
dirname: "/tmp",
closed: false,
log: zap.NewNop().Sugar(),
metrics: newMetrics("test", 1),
fs: vfs.NewMem(),
clusterID: 1,
nodeID: 1,
tableName: "test",
dirname: "/tmp",
closed: false,
log: zap.NewNop().Sugar(),
metrics: newMetrics("test", 1),
appliedFunc: func(applied uint64) {},
}

db, err := rp.OpenDB(fsm.dirname, rp.WithFS(fsm.fs))
Expand Down
Loading

0 comments on commit 6a32f54

Please sign in to comment.