Skip to content

Commit

Permalink
bug: graceful handling of leader table deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 17, 2024
1 parent e6538a7 commit dfa3654
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 38 deletions.
81 changes: 51 additions & 30 deletions replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package replication
import (
"context"
"errors"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -55,6 +56,7 @@ func NewManager(tm *table.Manager, nh *dragonboat.NodeHost, conn *grpc.ClientCon
tm: tm,
metadataClient: regattapb.NewMetadataClient(conn),
factory: &workerFactory{
reconcileInterval: cfg.ReconcileInterval,
pollInterval: cfg.Workers.PollInterval,
leaseInterval: cfg.Workers.LeaseInterval,
logTimeout: cfg.Workers.LogRPCTimeout,
Expand All @@ -73,7 +75,6 @@ func NewManager(tm *table.Manager, nh *dragonboat.NodeHost, conn *grpc.ClientCon
},
workers: struct {
registry map[string]*worker
mtx sync.RWMutex
wg sync.WaitGroup
}{
registry: make(map[string]*worker),
Expand All @@ -91,7 +92,6 @@ type Manager struct {
factory *workerFactory
workers struct {
registry map[string]*worker
mtx sync.RWMutex
wg sync.WaitGroup
}
log *zap.SugaredLogger
Expand Down Expand Up @@ -119,18 +119,19 @@ func (m *Manager) Start() {
t := time.NewTicker(m.reconcileInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-m.closer:
m.log.Info("replication stopped")
return
}
if err := m.reconcileTables(); err != nil {
m.log.Errorf("failed to reconcile tables: %v", err)
continue
}
if err := m.reconcileWorkers(); err != nil {
m.log.Errorf("failed to reconcile replication workers: %v", err)
}
select {
case <-t.C:
continue
case <-m.closer:
m.log.Info("replication stopped")
return
}
}
}()
Expand All @@ -143,8 +144,37 @@ func (m *Manager) reconcileTables() error {
if err != nil {
return err
}
for _, tabs := range response.GetTables() {
if err := m.tm.CreateTable(tabs.Name); err != nil && !errors.Is(err, serrors.ErrTableExists) {
leaderTables := response.GetTables()
followerTables, err := m.tm.GetTables()
if err != nil {
return err
}
var toCreate, toDelete []string

for _, ft := range followerTables {
if !slices.ContainsFunc(leaderTables, func(lt *regattapb.Table) bool {
return ft.Name == lt.Name
}) {
toDelete = append(toDelete, ft.Name)
}
}

for _, ft := range leaderTables {
if !slices.ContainsFunc(followerTables, func(lt table.Table) bool {
return ft.Name == lt.Name
}) {
toCreate = append(toCreate, ft.Name)
}
}

for _, name := range toDelete {
if err := m.tm.DeleteTable(name); err != nil && !errors.Is(err, serrors.ErrTableNotFound) {
return err
}
}

for _, name := range toCreate {
if _, err := m.tm.CreateTable(name); err != nil && !errors.Is(err, serrors.ErrTableExists) {
return err
}
}
Expand All @@ -163,20 +193,19 @@ func (m *Manager) reconcileWorkers() error {
}
}

m.workers.mtx.RLock()
defer m.workers.mtx.RUnlock()
for name, worker := range m.workers.registry {
found := false
for _, tbl := range tbs {
if tbl.Name == name {
found = true
break
}
}
if !found {
m.stopWorker(worker)
var toStop []*worker

for name, w := range m.workers.registry {
if !slices.ContainsFunc(tbs, func(t table.Table) bool {
return t.Name == name
}) {
toStop = append(toStop, w)
}
}

for _, w := range toStop {
m.stopWorker(w)
}
return nil
}

Expand All @@ -190,26 +219,18 @@ func (m *Manager) Close() {
}

func (m *Manager) hasWorker(name string) bool {
m.workers.mtx.RLock()
defer m.workers.mtx.RUnlock()
_, ok := m.workers.registry[name]
return ok
}

func (m *Manager) startWorker(worker *worker) {
m.workers.mtx.Lock()
defer m.workers.mtx.Unlock()

m.log.Infof("launching replication for table %s", worker.table)
m.workers.registry[worker.table] = worker
m.workers.wg.Add(1)
worker.Start()
}

func (m *Manager) stopWorker(worker *worker) {
m.workers.mtx.Lock()
defer m.workers.mtx.Unlock()

m.log.Infof("stopping replication for table %s", worker.table)
worker.Close()
m.workers.wg.Done()
Expand Down
40 changes: 32 additions & 8 deletions replication/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jamf/regatta/regattapb"
"github.com/jamf/regatta/replication/snapshot"
serrors "github.com/jamf/regatta/storage/errors"
"github.com/jamf/regatta/storage/table"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/client"
Expand All @@ -23,6 +24,8 @@ import (
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TODO make configurable.
Expand All @@ -36,9 +39,11 @@ const (
resultLeaderAhead
resultFollowerLagging
resultFollowerTailing
resultTableNotExists
)

type workerFactory struct {
reconcileInterval time.Duration
pollInterval time.Duration
leaseInterval time.Duration
logTimeout time.Duration
Expand Down Expand Up @@ -144,6 +149,10 @@ func (w *worker) Start() {
t.Reset(w.pollInterval)
idx, sess, err := w.tableState()
if err != nil {
if errors.Is(err, serrors.ErrTableNotFound) {
w.log.Debugf("table not found: %v", err)
continue
}
w.log.Errorf("cannot query leader index: %v", err)
continue
}
Expand All @@ -153,17 +162,15 @@ func (w *worker) Start() {
continue
}
result, err := w.do(idx, sess)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
w.log.Warnf("unable to read leader log in time: %v", err)
} else {
w.log.Warnf("unknown worker error: %v", err)
}
continue
}
switch result {
case resultTableNotExists:
w.log.Infof("the leader table dissapeared ... backing off")
// Give reconciler time to clean up the table.
t.Reset(2 * w.reconcileInterval)
case resultLeaderBehind:
w.log.Errorf("the leader log is behind ... backing off")
// Give leader time to catch up.
t.Reset(10 * w.pollInterval)
case resultLeaderAhead:
if w.recoverySemaphore.TryAcquire(1) {
func() {
Expand All @@ -181,6 +188,17 @@ func (w *worker) Start() {
case resultFollowerLagging:
// Trigger next loop immediately.
t.Reset(50 * time.Millisecond)
case resultFollowerTailing:
continue
case resultUnknown:
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
w.log.Warnf("unable to read leader log in time: %v", err)
} else {
w.log.Warnf("unknown worker error: %v", err)
}
continue
}
}
case <-w.closer:
return
Expand Down Expand Up @@ -212,6 +230,9 @@ func (w *worker) do(leaderIndex uint64, session *client.Session) (replicateResul
defer cancel()
stream, err := w.logClient.Replicate(ctx, replicateRequest, grpc.WaitForReady(true))
if err != nil {
if c, ok := status.FromError(err); ok && c.Code() == codes.Unavailable {
return resultTableNotExists, fmt.Errorf("could not open log stream: %w", err)
}
return resultUnknown, fmt.Errorf("could not open log stream: %w", err)
}
var applied uint64
Expand All @@ -221,6 +242,9 @@ func (w *worker) do(leaderIndex uint64, session *client.Session) (replicateResul
return resultUnknown, nil
}
if err != nil {
if c, ok := status.FromError(err); ok && c.Code() == codes.Unavailable {
return resultTableNotExists, fmt.Errorf("error reading replication stream: %w", err)
}
return resultUnknown, fmt.Errorf("error reading replication stream: %w", err)
}

Expand Down

0 comments on commit dfa3654

Please sign in to comment.