Skip to content

Commit

Permalink
Merge e583a0a into 6a58cd2
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 22, 2024
2 parents 6a58cd2 + e583a0a commit 406e22d
Show file tree
Hide file tree
Showing 19 changed files with 667 additions and 184 deletions.
51 changes: 28 additions & 23 deletions cmd/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func follower(_ *cobra.Command, _ []string) error {
return fmt.Errorf("failed to parse raft.logdb: %w", err)
}

nQueue := storage.NewNotificationQueue()
go nQueue.Run()
defer func() {
_ = nQueue.Close()
}()
engine, err := storage.New(storage.Config{
Log: engineLog.Sugar(),
ClientAddress: viper.GetString("api.advertise-address"),
Expand All @@ -128,16 +133,17 @@ func follower(_ *cobra.Command, _ []string) error {
NodeName: viper.GetString("memberlist.node-name"),
},
Table: storage.TableConfig{
FS: vfs.Default,
ElectionRTT: viper.GetUint64("raft.election-rtt"),
HeartbeatRTT: viper.GetUint64("raft.heartbeat-rtt"),
SnapshotEntries: viper.GetUint64("raft.snapshot-entries"),
CompactionOverhead: viper.GetUint64("raft.compaction-overhead"),
MaxInMemLogSize: viper.GetUint64("raft.max-in-mem-log-size"),
DataDir: viper.GetString("raft.state-machine-dir"),
RecoveryType: toRecoveryType(viper.GetString("raft.snapshot-recovery-type")),
BlockCacheSize: viper.GetInt64("storage.block-cache-size"),
TableCacheSize: viper.GetInt("storage.table-cache-size"),
FS: vfs.Default,
ElectionRTT: viper.GetUint64("raft.election-rtt"),
HeartbeatRTT: viper.GetUint64("raft.heartbeat-rtt"),
SnapshotEntries: viper.GetUint64("raft.snapshot-entries"),
CompactionOverhead: viper.GetUint64("raft.compaction-overhead"),
MaxInMemLogSize: viper.GetUint64("raft.max-in-mem-log-size"),
DataDir: viper.GetString("raft.state-machine-dir"),
RecoveryType: toRecoveryType(viper.GetString("raft.snapshot-recovery-type")),
BlockCacheSize: viper.GetInt64("storage.block-cache-size"),
TableCacheSize: viper.GetInt("storage.table-cache-size"),
AppliedIndexListener: nQueue.Notify,
},
Meta: storage.MetaConfig{
ElectionRTT: viper.GetUint64("raft.election-rtt"),
Expand All @@ -157,15 +163,14 @@ func follower(_ *cobra.Command, _ []string) error {
defer engine.Close()

// Replication
conn, err := createReplicationConn()
defer func() {
_ = conn.Close()
}()
if err != nil {
return fmt.Errorf("cannot create replication conn: %w", err)
}
{
conn, err := createReplicationConn()
defer func() {
_ = conn.Close()
}()
if err != nil {
return fmt.Errorf("cannot create replication conn: %w", err)
}

d := replication.NewManager(engine.Manager, engine.NodeHost, conn, replication.Config{
ReconcileInterval: viper.GetDuration("replication.reconcile-interval"),
Workers: replication.WorkerConfig{
Expand All @@ -191,11 +196,11 @@ func follower(_ *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("failed to create API server: %w", err)
}
regattapb.RegisterKVServer(regatta, &regattaserver.ReadonlyKVServer{
KVServer: regattaserver.KVServer{
Storage: engine,
},
})
fkv := regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue)
defer func() {
_ = conn.Close()
}()
regattapb.RegisterKVServer(regatta, fkv)
regattapb.RegisterClusterServer(regatta, &regattaserver.ClusterServer{
Cluster: engine,
Config: viperConfigReader,
Expand Down
1 change: 1 addition & 0 deletions cmd/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func leader(_ *cobra.Command, _ []string) error {
)
regattapb.RegisterMetadataServer(replication, &regattaserver.MetadataServer{Tables: engine})
regattapb.RegisterSnapshotServer(replication, &regattaserver.SnapshotServer{Tables: engine})
regattapb.RegisterKVServer(replication, &regattaserver.KVServer{Storage: engine})
regattapb.RegisterLogServer(replication, ls)
// Start server
go func() {
Expand Down
61 changes: 61 additions & 0 deletions regattaserver/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package regattaserver
import (
"context"
"errors"
"fmt"

"github.com/jamf/regatta/regattapb"
"github.com/jamf/regatta/storage"
serrors "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/util/iter"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -205,3 +207,62 @@ func (r *ReadonlyKVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (
}
return nil, status.Error(codes.Unimplemented, "writable Txn not implemented for follower")
}

func NewForwardingKVServer(storage KVService, client regattapb.KVClient, q *storage.IndexNotificationQueue) *ForwardingKVServer {
return &ForwardingKVServer{
KVServer: KVServer{Storage: storage},
client: client,
q: q,
}
}

// ForwardingKVServer .
type ForwardingKVServer struct {
KVServer
client regattapb.KVClient
q *storage.IndexNotificationQueue
}

// Put implements proto/regatta.proto KV.Put method.
func (r *ForwardingKVServer) Put(ctx context.Context, req *regattapb.PutRequest) (*regattapb.PutResponse, error) {
put, err := r.client.Put(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}

return put, <-r.q.Add(ctx, string(req.Table), put.Header.Revision)
}

// DeleteRange implements proto/regatta.proto KV.DeleteRange method.
func (r *ForwardingKVServer) DeleteRange(ctx context.Context, req *regattapb.DeleteRangeRequest) (*regattapb.DeleteRangeResponse, error) {
del, err := r.client.DeleteRange(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}
return del, <-r.q.Add(ctx, string(req.Table), del.Header.Revision)
}

// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is allowed to modify the same key several times within one txn (the result will be the last Op that modified the key).
// Readonly transactions allowed using follower API.
func (r *ForwardingKVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (*regattapb.TxnResponse, error) {
if req.IsReadonly() {
return r.KVServer.Txn(ctx, req)
}
txn, err := r.client.Txn(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}
return txn, <-r.q.Add(ctx, string(req.Table), txn.Header.Revision)
}
2 changes: 1 addition & 1 deletion storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func New(cfg Config) (*Engine, error) {
},
)
if cfg.LogCacheSize > 0 {
e.LogCache = &logreader.ShardCache{ShardCacheSize: cfg.LogCacheSize}
e.LogCache = logreader.NewShardCache(cfg.LogCacheSize)
e.LogReader = &logreader.Cached{LogQuerier: nh, ShardCache: e.LogCache}
} else {
e.LogReader = &logreader.Simple{LogQuerier: nh}
Expand Down
3 changes: 0 additions & 3 deletions storage/engine_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ func (e *Engine) dispatchEvents() {
case leaderUpdated, nodeUnloaded, membershipChanged, nodeHostShuttingDown:
e.Cluster.Notify()
case nodeReady:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.NodeReady(ev.ShardID)
}
e.Cluster.Notify()
case nodeDeleted:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
Expand Down
15 changes: 8 additions & 7 deletions storage/logreader/logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,15 @@ func (l *Simple) QueryRaftLog(ctx context.Context, clusterID uint64, logRange dr
}

type ShardCache struct {
shardCache util.SyncMap[uint64, *shard]
ShardCacheSize int
shardCache *util.SyncMap[uint64, *shard]
}

func (l *ShardCache) NodeDeleted(shardID uint64) {
l.shardCache.Delete(shardID)
}

func (l *ShardCache) NodeReady(shardID uint64) {
l.shardCache.ComputeIfAbsent(shardID, func(shardId uint64) *shard { return &shard{cache: newCache(l.ShardCacheSize)} })
}

func (l *ShardCache) LogCompacted(shardID uint64) {
l.shardCache.Store(shardID, &shard{cache: newCache(l.ShardCacheSize)})
l.shardCache.Delete(shardID)
}

type Cached struct {
Expand Down Expand Up @@ -156,3 +151,9 @@ func fixSize(entries []raftpb.Entry, maxSize uint64) []raftpb.Entry {
}
return entries
}

func NewShardCache(size int) *ShardCache {
return &ShardCache{shardCache: util.NewSyncMap(func(k uint64) *shard {
return &shard{cache: newCache(size)}
})}
}
91 changes: 9 additions & 82 deletions storage/logreader/logreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

serror "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/util"
"github.com/jamf/regatta/util/iter"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/raftio"
"github.com/lni/dragonboat/v4/raftpb"
Expand Down Expand Up @@ -69,32 +70,30 @@ func TestCached_NodeDeleted(t *testing.T) {
}{
{
name: "remove existing cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove non-existent cache shard",
fields: fields{ShardCache: &ShardCache{}},
fields: fields{ShardCache: NewShardCache(1)},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(1)), "unexpected cache shard")
},
},
{
name: "remove existent cache shard from list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c := NewShardCache(100)
for i := 1; i <= 4; i++ {
c.shardCache.Store(uint64(i), &shard{})
}
Expand All @@ -105,86 +104,15 @@ func TestCached_NodeDeleted(t *testing.T) {
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.False(t, ok, "unexpected cache shard")
require.False(t, iter.Contains(s.Keys(), uint64(2)), "unexpected cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeDeleted(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
})
}
}

func TestCached_NodeReady(t *testing.T) {
type fields struct {
ShardCache *ShardCache
}
type args struct {
info raftio.NodeInfo
}
tests := []struct {
name string
args args
fields fields
assert func(*testing.T, *util.SyncMap[uint64, *shard])
}{
{
name: "add ready node",
fields: fields{ShardCache: &ShardCache{}},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add existing node",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 1,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(1))
require.True(t, ok, "missing cache shard")
},
},
{
name: "add ready node to list",
fields: fields{ShardCache: func() *ShardCache {
c := &ShardCache{}
c.shardCache.Store(uint64(1), &shard{})
c.shardCache.Store(uint64(3), &shard{})
c.shardCache.Store(uint64(5), &shard{})
c.shardCache.Store(uint64(6), &shard{})
return c
}()},
args: args{info: raftio.NodeInfo{
ShardID: 2,
ReplicaID: 1,
}},
assert: func(t *testing.T, s *util.SyncMap[uint64, *shard]) {
_, ok := s.Load(uint64(2))
require.True(t, ok, "missing cache shard")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &Cached{ShardCache: tt.fields.ShardCache}
l.NodeReady(tt.args.info.ShardID)
tt.assert(t, &l.shardCache)
tt.assert(t, l.shardCache)
})
}
}
Expand Down Expand Up @@ -483,10 +411,9 @@ func TestCached_QueryRaftLog(t *testing.T) {
tt.on(querier)
}
l := &Cached{
ShardCache: &ShardCache{ShardCacheSize: tt.fields.ShardCacheSize},
ShardCache: NewShardCache(tt.fields.ShardCacheSize),
LogQuerier: querier,
}
l.shardCache.ComputeIfAbsent(tt.args.clusterID, func(uint642 uint64) *shard { return &shard{cache: newCache(tt.fields.ShardCacheSize)} })
if len(tt.cacheContent) > 0 {
v, _ := l.shardCache.
Load(tt.args.clusterID)
Expand Down
Loading

0 comments on commit 406e22d

Please sign in to comment.