Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: follower writes #247

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions cmd/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func follower(_ *cobra.Command, _ []string) error {
return fmt.Errorf("failed to parse raft.logdb: %w", err)
}

nQueue := storage.NewNotificationQueue()
go nQueue.Run()
defer func() {
_ = nQueue.Close()
}()
engine, err := storage.New(storage.Config{
Log: engineLog.Sugar(),
ClientAddress: viper.GetString("api.advertise-address"),
Expand All @@ -129,16 +134,17 @@ func follower(_ *cobra.Command, _ []string) error {
NodeName: viper.GetString("memberlist.node-name"),
},
Table: storage.TableConfig{
FS: vfs.Default,
ElectionRTT: viper.GetUint64("raft.election-rtt"),
HeartbeatRTT: viper.GetUint64("raft.heartbeat-rtt"),
SnapshotEntries: viper.GetUint64("raft.snapshot-entries"),
CompactionOverhead: viper.GetUint64("raft.compaction-overhead"),
MaxInMemLogSize: viper.GetUint64("raft.max-in-mem-log-size"),
DataDir: viper.GetString("raft.state-machine-dir"),
RecoveryType: toRecoveryType(viper.GetString("raft.snapshot-recovery-type")),
BlockCacheSize: viper.GetInt64("storage.block-cache-size"),
TableCacheSize: viper.GetInt("storage.table-cache-size"),
FS: vfs.Default,
ElectionRTT: viper.GetUint64("raft.election-rtt"),
HeartbeatRTT: viper.GetUint64("raft.heartbeat-rtt"),
SnapshotEntries: viper.GetUint64("raft.snapshot-entries"),
CompactionOverhead: viper.GetUint64("raft.compaction-overhead"),
MaxInMemLogSize: viper.GetUint64("raft.max-in-mem-log-size"),
DataDir: viper.GetString("raft.state-machine-dir"),
RecoveryType: toRecoveryType(viper.GetString("raft.snapshot-recovery-type")),
BlockCacheSize: viper.GetInt64("storage.block-cache-size"),
TableCacheSize: viper.GetInt("storage.table-cache-size"),
AppliedIndexListener: nQueue.Notify,
},
Meta: storage.MetaConfig{
ElectionRTT: viper.GetUint64("raft.election-rtt"),
Expand All @@ -158,15 +164,14 @@ func follower(_ *cobra.Command, _ []string) error {
defer engine.Close()

// Replication
conn, err := createReplicationConn()
defer func() {
_ = conn.Close()
}()
if err != nil {
return fmt.Errorf("cannot create replication conn: %w", err)
}
{
conn, err := createReplicationConn()
defer func() {
_ = conn.Close()
}()
if err != nil {
return fmt.Errorf("cannot create replication conn: %w", err)
}

d := replication.NewManager(engine.Manager, engine.NodeHost, conn, replication.Config{
ReconcileInterval: viper.GetDuration("replication.reconcile-interval"),
Workers: replication.WorkerConfig{
Expand All @@ -192,11 +197,7 @@ func follower(_ *cobra.Command, _ []string) error {
if err != nil {
return fmt.Errorf("failed to create API server: %w", err)
}
regattapb.RegisterKVServer(regatta, &regattaserver.ReadonlyKVServer{
KVServer: regattaserver.KVServer{
Storage: engine,
},
})
regattapb.RegisterKVServer(regatta, regattaserver.NewForwardingKVServer(engine, regattapb.NewKVClient(conn), nQueue))
regattapb.RegisterClusterServer(regatta, &regattaserver.ClusterServer{
Cluster: engine,
Config: viperConfigReader,
Expand Down
1 change: 1 addition & 0 deletions cmd/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func leader(_ *cobra.Command, _ []string) error {
)
regattapb.RegisterMetadataServer(replication, &regattaserver.MetadataServer{Tables: engine})
regattapb.RegisterSnapshotServer(replication, &regattaserver.SnapshotServer{Tables: engine})
regattapb.RegisterKVServer(replication, &regattaserver.KVServer{Storage: engine})
regattapb.RegisterLogServer(replication, ls)
// Start server
go func() {
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cockroachdb/pebble v0.0.0-20221207173255-0f086d933dac
github.com/google/uuid v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/memberlist v0.5.0
Expand Down Expand Up @@ -48,14 +49,13 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.25.0 // indirect
github.com/getsentry/sentry-go v0.26.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
Expand All @@ -64,11 +64,13 @@ require (
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lni/goutils v1.4.0 // indirect
github.com/lyft/protoc-gen-star/v2 v2.0.3 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/miekg/dns v1.1.56 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
Expand All @@ -106,6 +108,8 @@ github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyT
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI=
github.com/getsentry/sentry-go v0.25.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/getsentry/sentry-go v0.26.0 h1:IX3++sF6/4B5JcevhdZfdKIHfyvMmAq/UnqcyT2H6mA=
github.com/getsentry/sentry-go v0.26.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
Expand Down Expand Up @@ -207,6 +211,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
Expand Down Expand Up @@ -257,6 +263,8 @@ github.com/lni/goutils v1.4.0 h1:e1tNN+4zsbTpNvhG5cxirkH9Pdz96QAZ2j6+5tmjvqg=
github.com/lni/goutils v1.4.0/go.mod h1:LIHvF0fflR+zyXUQFQOiHPpKANf3UIr7DFIv5CBPOoU=
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 h1:jX9CoRWNPwrZ2yY3RJFTSwa49qDQqtXglrCByGdQGZg=
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376/go.mod h1:LOatfyR8Xeej1jbXybwYGVfCccR0u+BQRG9xg7BD7xo=
github.com/lyft/protoc-gen-star/v2 v2.0.3 h1:/3+/2sWyXeMLzKd1bX+ixWKgEMsULrIivpDsuaF441o=
github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
Expand Down
54 changes: 46 additions & 8 deletions regattaserver/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package regattaserver
import (
"context"
"errors"
"fmt"

"github.com/jamf/regatta/regattapb"
"github.com/jamf/regatta/storage"
serrors "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/util/iter"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -179,29 +181,65 @@ func (s *KVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (*regatta
return r, nil
}

// ReadonlyKVServer implements read part of KV service from proto/regatta.proto.
type ReadonlyKVServer struct {
func NewForwardingKVServer(storage KVService, client regattapb.KVClient, q *storage.IndexNotificationQueue) *ForwardingKVServer {
return &ForwardingKVServer{
KVServer: KVServer{Storage: storage},
client: client,
q: q,
}
}

type propagationQueue interface {
Add(ctx context.Context, table string, revision uint64) <-chan error
}

// ForwardingKVServer forwards the write operations to the leader cluster.
type ForwardingKVServer struct {
KVServer
client regattapb.KVClient
q propagationQueue
}

// Put implements proto/regatta.proto KV.Put method.
func (r *ReadonlyKVServer) Put(_ context.Context, _ *regattapb.PutRequest) (*regattapb.PutResponse, error) {
return nil, status.Error(codes.Unimplemented, "method Put not implemented for follower")
func (r *ForwardingKVServer) Put(ctx context.Context, req *regattapb.PutRequest) (*regattapb.PutResponse, error) {
put, err := r.client.Put(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}

return put, <-r.q.Add(ctx, string(req.Table), put.Header.Revision)
}

// DeleteRange implements proto/regatta.proto KV.DeleteRange method.
func (r *ReadonlyKVServer) DeleteRange(_ context.Context, _ *regattapb.DeleteRangeRequest) (*regattapb.DeleteRangeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method DeleteRange not implemented for follower")
func (r *ForwardingKVServer) DeleteRange(ctx context.Context, req *regattapb.DeleteRangeRequest) (*regattapb.DeleteRangeResponse, error) {
del, err := r.client.DeleteRange(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}
return del, <-r.q.Add(ctx, string(req.Table), del.Header.Revision)
}

// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is allowed to modify the same key several times within one txn (the result will be the last Op that modified the key).
// Readonly transactions allowed using follower API.
func (r *ReadonlyKVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (*regattapb.TxnResponse, error) {
func (r *ForwardingKVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (*regattapb.TxnResponse, error) {
if req.IsReadonly() {
return r.KVServer.Txn(ctx, req)
}
return nil, status.Error(codes.Unimplemented, "writable Txn not implemented for follower")
txn, err := r.client.Txn(ctx, req)
if err != nil {
if s, ok := status.FromError(err); ok {
return nil, status.Error(s.Code(), fmt.Sprintf("leader error: %v", s.Err()))
}
return nil, status.Error(codes.FailedPrecondition, "forward error")
}
return txn, <-r.q.Add(ctx, string(req.Table), txn.Header.Revision)
}
Loading
Loading