Skip to content

Commit

Permalink
Merge pull request #268 from sgotti/handle_max_standby_lag
Browse files Browse the repository at this point in the history
sentinel: don't choose keepers with db behind a defined lag as masters.
  • Loading branch information
sgotti committed Apr 25, 2017
2 parents 2397b58 + 5587b86 commit 3db89c9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 3 deletions.
31 changes: 31 additions & 0 deletions cmd/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,19 @@ func (s *Sentinel) isDifferentTimelineBranch(followedDB *cluster.DB, db *cluster
return false
}

// isLagBelowMax checks if the db reported lag is below MaxStandbyLag from the
// master reported lag
func (s *Sentinel) isLagBelowMax(cd *cluster.ClusterData, curMasterDB, db *cluster.DB) bool {
if !*cd.Cluster.DefSpec().SynchronousReplication {
log.Debug(fmt.Sprintf("curMasterDB.Status.XLogPos: %d, db.Status.XLogPos: %d, lag: %d", curMasterDB.Status.XLogPos, db.Status.XLogPos, int64(curMasterDB.Status.XLogPos-db.Status.XLogPos)))
if int64(curMasterDB.Status.XLogPos-db.Status.XLogPos) > int64(*cd.Cluster.DefSpec().MaxStandbyLag) {
log.Debug("ignoring keeper since its behind that maximum xlog position", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", curMasterDB.Status.XLogPos))
return false
}
}
return true
}

func (s *Sentinel) freeKeepers(cd *cluster.ClusterData) []*cluster.Keeper {
freeKeepers := []*cluster.Keeper{}
K:
Expand Down Expand Up @@ -548,6 +561,15 @@ func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.D
log.Debug("ignoring keeper since its pg timeline is different than master timeline", zap.String("db", db.UID), zap.Uint64("dbTimeline", db.Status.TimelineID), zap.Uint64("masterTimeline", masterDB.Status.TimelineID))
continue
}
// do this only when not using synchronous replication since in sync repl we
// have to ignore the last reported xlogpos or valid sync standby will be
// skipped
if !*cd.Cluster.DefSpec().SynchronousReplication {
if !s.isLagBelowMax(cd, masterDB, db) {
log.Debug("ignoring keeper since its lag is above the max configured lag", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", masterDB.Status.XLogPos))
continue
}
}
bestDBs = append(bestDBs, db)
}
// Sort by XLogPos
Expand All @@ -569,6 +591,15 @@ func (s *Sentinel) findBestNewMasters(cd *cluster.ClusterData, masterDB *cluster
log.Debug("ignoring keeper since its pg timeline is different than master timeline", zap.String("db", db.UID), zap.Uint64("dbTimeline", db.Status.TimelineID), zap.Uint64("masterTimeline", masterDB.Status.TimelineID))
continue
}
// do this only when not using synchronous replication since in sync repl we
// have to ignore the last reported xlogpos or valid sync standby will be
// skipped
if !*cd.Cluster.DefSpec().SynchronousReplication {
if !s.isLagBelowMax(cd, masterDB, db) {
log.Debug("ignoring keeper since its lag is above the max configured lag", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", masterDB.Status.XLogPos))
continue
}
}
bestNewMasters = append(bestNewMasters, db)
}
// Sort by XLogPos
Expand Down
2 changes: 1 addition & 1 deletion cmd/sentinel/sentinel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func TestUpdateCluster(t *testing.T) {
Proxy: &cluster.Proxy{},
},
},
// #2 cluster initialization, more than one keeper, the first will be choosen to be the new master.
{
// #2 cluster initialization, more than one keeper, the first will be choosen to be the new master.
cd: &cluster.ClusterData{
Cluster: &cluster.Cluster{
UID: "cluster1",
Expand Down
1 change: 1 addition & 0 deletions doc/cluster_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Some options in a running cluster specification can be changed to update the des
| failInterval | interval after the first fail to declare a keeper as not healthy. | no | string (duration) | 20s |
| maxStandbys | max number of standbys. This needs to be greater enough to cover both standby managed by stolon and additional standbys configured by the user. Its value affect different postgres parameters like max_replication_slots and max_wal_senders. Setting this to a number lower than the sum of stolon managed standbys and user managed standbys will have unpredicatable effects due to problems creating replication slots or replication problems due to exhausted wal senders. | no | uint16 | 20 |
| maxStandbysPerSender | max number of standbys for every sender. A sender can be a master or another standby (with cascading replication). | no | uint16 | 3 |
| maxStandbyLag | maximum lag (from the last reported master state, in bytes) that an asynchronous standby can have to be elected in place of a failed master. | no | uint32 | 1MiB |
| synchronousReplication | use synchronous replication between the master and its standbys | no | bool | false |
| minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | int16 | 1 |
| maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | int16 | 1 |
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
func Uint16P(u uint16) *uint16 {
return &u
}
func Uint32P(u uint32) *uint32 {
return &u
}

func BoolP(b bool) *bool {
return &b
Expand All @@ -51,6 +54,7 @@ const (
DefaultFailInterval = 20 * time.Second
DefaultMaxStandbys uint16 = 20
DefaultMaxStandbysPerSender uint16 = 3
DefaultMaxStandbyLag = 1024 * 1204
DefaultSynchronousReplication = false
DefaultMaxSynchronousStandbys uint16 = 1
DefaultMinSynchronousStandbys uint16 = 1
Expand Down Expand Up @@ -184,6 +188,9 @@ type ClusterSpec struct {
// Max number of standbys for every sender. A sender can be a master or
// another standby (if/when implementing cascading replication).
MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender,omitempty"`
// Max lag in bytes that an asynchronous standy can have to be elected in
// place of a failed master
MaxStandbyLag *uint32 `json:"maxStandbyLage,omitempty"`
// Use Synchronous replication between master and its standbys
SynchronousReplication *bool `json:"synchronousReplication,omitempty"`
// MinSynchronousStandbys is the mininum number if synchronous standbys
Expand Down Expand Up @@ -287,6 +294,9 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec {
if s.MaxStandbysPerSender == nil {
s.MaxStandbysPerSender = Uint16P(DefaultMaxStandbysPerSender)
}
if s.MaxStandbyLag == nil {
s.MaxStandbyLag = Uint32P(DefaultMaxStandbyLag)
}
if s.SynchronousReplication == nil {
s.SynchronousReplication = BoolP(DefaultSynchronousReplication)
}
Expand Down
71 changes: 71 additions & 0 deletions tests/integration/ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinel
SleepInterval: &cluster.Duration{Duration: 2 * time.Second},
FailInterval: &cluster.Duration{Duration: 5 * time.Second},
ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second},
MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB
SynchronousReplication: cluster.BoolP(syncRepl),
UsePgrewind: cluster.BoolP(usePgrewind),
PGParameters: make(cluster.PGParameters),
Expand Down Expand Up @@ -339,6 +340,9 @@ func testFailover(t *testing.T, syncRepl bool) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Stop the keeper process on master, should also stop the database
t.Logf("Stopping current master keeper: %s", master.uid)
master.Stop()
Expand Down Expand Up @@ -399,6 +403,9 @@ func testFailoverFailed(t *testing.T, syncRepl bool) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Stop the keeper process on master, should also stop the database
t.Logf("Stopping current master keeper: %s", master.uid)
master.Stop()
Expand Down Expand Up @@ -437,6 +444,58 @@ func TestFailoverFailedSyncRepl(t *testing.T) {
testFailoverFailed(t, true)
}

// test that a standby with a lag (reported) greater than MaxStandbyLag from the
// master (reported) xlogpos won't be elected as the new master. This test is
// valid only for asynchronous replication
func TestFailoverTooMuchLag(t *testing.T) {
t.Parallel()

dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)

clusterName := uuid.NewV4().String()

tks, tss, tstore := setupServers(t, clusterName, dir, 2, 1, false, false)
defer shutdown(tks, tss, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]

if err := populate(t, master); err != nil {
t.Fatalf("unexpected err: %v", err)
}

// stop the standby and write more than MaxStandbyLag data to the master
t.Logf("Stopping current standby keeper: %s", standby.uid)
standby.Stop()
for i := 1; i < 1000; i++ {
if err := write(t, master, i, i); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}

// wait for the master to have reported its state
time.Sleep(5 * time.Second)

// Stop the keeper process on master, should also stop the database
t.Logf("Stopping current master keeper: %s", master.uid)
master.Stop()
// start the standby
t.Logf("Starting current standby keeper: %s", standby.uid)
standby.Start()

// standby shouldn't be elected as master since its lag is greater than MaxStandbyLag
if err := standby.WaitRole(common.RoleMaster, 30*time.Second); err == nil {
t.Fatalf("standby shouldn't be elected as master")
}
}

func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
Expand Down Expand Up @@ -468,6 +527,9 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Stop the keeper process on master, should also stop the database
t.Logf("Stopping current master keeper: %s", master.uid)
master.Stop()
Expand Down Expand Up @@ -573,6 +635,9 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Freeze the keeper and postgres processes on the master
t.Logf("SIGSTOPping current master keeper: %s", master.uid)
if err := master.Signal(syscall.SIGSTOP); err != nil {
Expand Down Expand Up @@ -687,6 +752,9 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Wait replicated data to standby
if err := waitLines(t, standbys[0], 1, 10*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
Expand Down Expand Up @@ -827,6 +895,9 @@ func TestMasterChangedAddress(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}

// wait for the keepers to have reported their state (needed to know the instance XLogPos)
time.Sleep(5 * time.Second)

// Wait standby synced with master
if err := waitLines(t, master, 1, 60*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, t
}

// WaitClusterDataSynchronousStandbys waits for:
// * synchrnous standby defined in masterdb spec
// * synchrnous standby reported from masterdb status
// * synchronous standby defined in masterdb spec
// * synchronous standby reported from masterdb status
func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.StoreManager, timeout time.Duration) error {
sort.Sort(sort.StringSlice(synchronousStandbys))
start := time.Now()
Expand Down

0 comments on commit 3db89c9

Please sign in to comment.