diff --git a/cmd/sentinel/sentinel.go b/cmd/sentinel/sentinel.go index 0b956fccc..9ee261d25 100644 --- a/cmd/sentinel/sentinel.go +++ b/cmd/sentinel/sentinel.go @@ -350,6 +350,19 @@ func (s *Sentinel) isDifferentTimelineBranch(followedDB *cluster.DB, db *cluster return false } +// isLagBelowMax checks if the db reported lag is below MaxStandbyLag from the +// master reported lag +func (s *Sentinel) isLagBelowMax(cd *cluster.ClusterData, curMasterDB, db *cluster.DB) bool { + if !*cd.Cluster.DefSpec().SynchronousReplication { + log.Debug(fmt.Sprintf("curMasterDB.Status.XLogPos: %d, db.Status.XLogPos: %d, lag: %d", curMasterDB.Status.XLogPos, db.Status.XLogPos, int64(curMasterDB.Status.XLogPos-db.Status.XLogPos))) + if int64(curMasterDB.Status.XLogPos-db.Status.XLogPos) > int64(*cd.Cluster.DefSpec().MaxStandbyLag) { + log.Debug("ignoring keeper since its behind that maximum xlog position", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", curMasterDB.Status.XLogPos)) + return false + } + } + return true +} + func (s *Sentinel) freeKeepers(cd *cluster.ClusterData) []*cluster.Keeper { freeKeepers := []*cluster.Keeper{} K: @@ -548,6 +561,15 @@ func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.D log.Debug("ignoring keeper since its pg timeline is different than master timeline", zap.String("db", db.UID), zap.Uint64("dbTimeline", db.Status.TimelineID), zap.Uint64("masterTimeline", masterDB.Status.TimelineID)) continue } + // do this only when not using synchronous replication since in sync repl we + // have to ignore the last reported xlogpos or valid sync standby will be + // skipped + if !*cd.Cluster.DefSpec().SynchronousReplication { + if !s.isLagBelowMax(cd, masterDB, db) { + log.Debug("ignoring keeper since its lag is above the max configured lag", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", masterDB.Status.XLogPos)) + continue + } + } bestDBs = append(bestDBs, db) } // Sort by XLogPos @@ -569,6 +591,15 @@ func (s *Sentinel) findBestNewMasters(cd *cluster.ClusterData, masterDB *cluster log.Debug("ignoring keeper since its pg timeline is different than master timeline", zap.String("db", db.UID), zap.Uint64("dbTimeline", db.Status.TimelineID), zap.Uint64("masterTimeline", masterDB.Status.TimelineID)) continue } + // do this only when not using synchronous replication since in sync repl we + // have to ignore the last reported xlogpos or valid sync standby will be + // skipped + if !*cd.Cluster.DefSpec().SynchronousReplication { + if !s.isLagBelowMax(cd, masterDB, db) { + log.Debug("ignoring keeper since its lag is above the max configured lag", zap.String("db", db.UID), zap.Uint64("dbXLogPos", db.Status.XLogPos), zap.Uint64("masterXLogPos", masterDB.Status.XLogPos)) + continue + } + } bestNewMasters = append(bestNewMasters, db) } // Sort by XLogPos diff --git a/cmd/sentinel/sentinel_test.go b/cmd/sentinel/sentinel_test.go index 3b94bb98a..1a708cd5a 100644 --- a/cmd/sentinel/sentinel_test.go +++ b/cmd/sentinel/sentinel_test.go @@ -183,8 +183,8 @@ func TestUpdateCluster(t *testing.T) { Proxy: &cluster.Proxy{}, }, }, + // #2 cluster initialization, more than one keeper, the first will be choosen to be the new master. { - // #2 cluster initialization, more than one keeper, the first will be choosen to be the new master. cd: &cluster.ClusterData{ Cluster: &cluster.Cluster{ UID: "cluster1", diff --git a/doc/cluster_spec.md b/doc/cluster_spec.md index ecd2d368b..be7a8cec5 100644 --- a/doc/cluster_spec.md +++ b/doc/cluster_spec.md @@ -19,6 +19,7 @@ Some options in a running cluster specification can be changed to update the des | failInterval | interval after the first fail to declare a keeper as not healthy. | no | string (duration) | 20s | | maxStandbys | max number of standbys. This needs to be greater enough to cover both standby managed by stolon and additional standbys configured by the user. Its value affect different postgres parameters like max_replication_slots and max_wal_senders. Setting this to a number lower than the sum of stolon managed standbys and user managed standbys will have unpredicatable effects due to problems creating replication slots or replication problems due to exhausted wal senders. | no | uint16 | 20 | | maxStandbysPerSender | max number of standbys for every sender. A sender can be a master or another standby (with cascading replication). | no | uint16 | 3 | +| maxStandbyLag | maximum lag (from the last reported master state, in bytes) that an asynchronous standby can have to be elected in place of a failed master. | no | uint32 | 1MiB | | synchronousReplication | use synchronous replication between the master and its standbys | no | bool | false | | minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | int16 | 1 | | maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | int16 | 1 | diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9825c3c1e..5b3dc9fd9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -31,6 +31,9 @@ import ( func Uint16P(u uint16) *uint16 { return &u } +func Uint32P(u uint32) *uint32 { + return &u +} func BoolP(b bool) *bool { return &b @@ -51,6 +54,7 @@ const ( DefaultFailInterval = 20 * time.Second DefaultMaxStandbys uint16 = 20 DefaultMaxStandbysPerSender uint16 = 3 + DefaultMaxStandbyLag = 1024 * 1204 DefaultSynchronousReplication = false DefaultMaxSynchronousStandbys uint16 = 1 DefaultMinSynchronousStandbys uint16 = 1 @@ -184,6 +188,9 @@ type ClusterSpec struct { // Max number of standbys for every sender. A sender can be a master or // another standby (if/when implementing cascading replication). MaxStandbysPerSender *uint16 `json:"maxStandbysPerSender,omitempty"` + // Max lag in bytes that an asynchronous standy can have to be elected in + // place of a failed master + MaxStandbyLag *uint32 `json:"maxStandbyLage,omitempty"` // Use Synchronous replication between master and its standbys SynchronousReplication *bool `json:"synchronousReplication,omitempty"` // MinSynchronousStandbys is the mininum number if synchronous standbys @@ -287,6 +294,9 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec { if s.MaxStandbysPerSender == nil { s.MaxStandbysPerSender = Uint16P(DefaultMaxStandbysPerSender) } + if s.MaxStandbyLag == nil { + s.MaxStandbyLag = Uint32P(DefaultMaxStandbyLag) + } if s.SynchronousReplication == nil { s.SynchronousReplication = BoolP(DefaultSynchronousReplication) } diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index 8e56101e9..40c32fbcf 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -125,6 +125,7 @@ func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinel SleepInterval: &cluster.Duration{Duration: 2 * time.Second}, FailInterval: &cluster.Duration{Duration: 5 * time.Second}, ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second}, + MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB SynchronousReplication: cluster.BoolP(syncRepl), UsePgrewind: cluster.BoolP(usePgrewind), PGParameters: make(cluster.PGParameters), @@ -339,6 +340,9 @@ func testFailover(t *testing.T, syncRepl bool) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Stop the keeper process on master, should also stop the database t.Logf("Stopping current master keeper: %s", master.uid) master.Stop() @@ -399,6 +403,9 @@ func testFailoverFailed(t *testing.T, syncRepl bool) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Stop the keeper process on master, should also stop the database t.Logf("Stopping current master keeper: %s", master.uid) master.Stop() @@ -437,6 +444,58 @@ func TestFailoverFailedSyncRepl(t *testing.T) { testFailoverFailed(t, true) } +// test that a standby with a lag (reported) greater than MaxStandbyLag from the +// master (reported) xlogpos won't be elected as the new master. This test is +// valid only for asynchronous replication +func TestFailoverTooMuchLag(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + clusterName := uuid.NewV4().String() + + tks, tss, tstore := setupServers(t, clusterName, dir, 2, 1, false, false) + defer shutdown(tks, tss, tstore) + + storePath := filepath.Join(common.StoreBasePath, clusterName) + sm := store.NewStoreManager(tstore.store, storePath) + + master, standbys := waitMasterStandbysReady(t, sm, tks) + standby := standbys[0] + + if err := populate(t, master); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // stop the standby and write more than MaxStandbyLag data to the master + t.Logf("Stopping current standby keeper: %s", standby.uid) + standby.Stop() + for i := 1; i < 1000; i++ { + if err := write(t, master, i, i); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for the master to have reported its state + time.Sleep(5 * time.Second) + + // Stop the keeper process on master, should also stop the database + t.Logf("Stopping current master keeper: %s", master.uid) + master.Stop() + // start the standby + t.Logf("Starting current standby keeper: %s", standby.uid) + standby.Start() + + // standby shouldn't be elected as master since its lag is greater than MaxStandbyLag + if err := standby.WaitRole(common.RoleMaster, 30*time.Second); err == nil { + t.Fatalf("standby shouldn't be elected as master") + } +} + func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool) { dir, err := ioutil.TempDir("", "stolon") if err != nil { @@ -468,6 +527,9 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Stop the keeper process on master, should also stop the database t.Logf("Stopping current master keeper: %s", master.uid) master.Stop() @@ -573,6 +635,9 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Freeze the keeper and postgres processes on the master t.Logf("SIGSTOPping current master keeper: %s", master.uid) if err := master.Signal(syscall.SIGSTOP); err != nil { @@ -687,6 +752,9 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Wait replicated data to standby if err := waitLines(t, standbys[0], 1, 10*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) @@ -827,6 +895,9 @@ func TestMasterChangedAddress(t *testing.T) { t.Fatalf("unexpected err: %v", err) } + // wait for the keepers to have reported their state (needed to know the instance XLogPos) + time.Sleep(5 * time.Second) + // Wait standby synced with master if err := waitLines(t, master, 1, 60*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 4e3895be7..8f075dad1 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -749,8 +749,8 @@ func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, t } // WaitClusterDataSynchronousStandbys waits for: -// * synchrnous standby defined in masterdb spec -// * synchrnous standby reported from masterdb status +// * synchronous standby defined in masterdb spec +// * synchronous standby reported from masterdb status func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.StoreManager, timeout time.Duration) error { sort.Sort(sort.StringSlice(synchronousStandbys)) start := time.Now()