Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement standby cluster feature #283

Merged
merged 2 commits into from
May 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For an introduction to stolon you can also take a look at [this post](https://sg
* Full cluster setup in minutes.
* Easy [cluster admininistration](doc/stolonctl.md)
* Can do point in time recovery integrating with your preferred backup/restore tool.
* [Standby cluster](doc/standbycluster.md) (for multi site replication and near zero downtime migration).
* Automatic service discovery and dynamic reconfiguration (handles postgres and stolon processes changing their addresses).
* Can use [pg_rewind](doc/pg_rewind.md) for fast instance resyncronization with current master.

Expand Down
183 changes: 131 additions & 52 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,15 @@ func (p *PostgresKeeper) createRecoveryParameters(standbySettings *cluster.Stand

if standbySettings != nil {
parameters["standby_mode"] = "on"
parameters["primary_slot_name"] = standbySettings.PrimarySlotName
if standbySettings.PrimaryConninfo != "" {
parameters["primary_conninfo"] = standbySettings.PrimaryConninfo
}
if standbySettings.PrimarySlotName != "" {
parameters["primary_slot_name"] = standbySettings.PrimarySlotName
}
if standbySettings.RecoveryMinApplyDelay != "" {
parameters["recovery_min_apply_delay"] = standbySettings.RecoveryMinApplyDelay
}

parameters["recovery_target_timeline"] = "latest"
}
Expand Down Expand Up @@ -744,6 +749,44 @@ func (p *PostgresKeeper) isDifferentTimelineBranch(followedDB *cluster.DB, pgSta
return false
}

func (p *PostgresKeeper) updateReplSlots(dbLocalState *DBLocalState, followersUIDs []string) error {
var replSlots []string
replSlots, err := p.pgm.GetReplicatinSlots()
log.Debug("replication slots", zap.Object("replSlots", replSlots))
if err != nil {
log.Error("err", zap.Error(err))
return err
}
// Drop replication slots
for _, slotName := range replSlots {
if !common.IsStolonName(slotName) {
continue
}
if !util.StringInSlice(followersUIDs, common.NameFromStolonName(slotName)) {
log.Info("dropping replication slot since db not marked as follower", zap.String("slot", slotName), zap.String("db", common.NameFromStolonName(slotName)))
if err = p.pgm.DropReplicationSlot(slotName); err != nil {
log.Error("err", zap.Error(err))
return err
}
}
}
// Create replication slots
for _, followerUID := range followersUIDs {
if followerUID == dbLocalState.UID {
continue
}
replSlot := common.StolonName(followerUID)
if !util.StringInSlice(replSlots, replSlot) {
log.Info("creating replication slot", zap.String("slot", replSlot), zap.String("db", followerUID))
if err = p.pgm.CreateReplicationSlot(replSlot); err != nil {
log.Error("err", zap.Error(err))
return err
}
}
}
return nil
}

func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
e := p.e
pgm := p.pgm
Expand Down Expand Up @@ -806,7 +849,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
dbls := p.dbLocalState
if dbls.Initializing {
// If we are here this means that the db initialization or
// resync as failed so we have to clean up stale data
// resync has failed so we have to clean up stale data
log.Error("db failed to initialize or resync")
// Clean up cluster db datadir
if err = pgm.RemoveAll(); err != nil {
Expand Down Expand Up @@ -964,7 +1007,11 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Error("failed to restore postgres database cluster", zap.Error(err))
return
}
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(nil, db.Spec.PITRConfig.ArchiveRecoverySettings)); err != nil {
var standbySettings *cluster.StandbySettings
if db.Spec.FollowConfig != nil && db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
standbySettings = db.Spec.FollowConfig.StandbySettings
}
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(standbySettings, db.Spec.PITRConfig.ArchiveRecoverySettings)); err != nil {
log.Error("err", zap.Error(err))
return
}
Expand Down Expand Up @@ -1208,55 +1255,38 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Info("already master")
}

var replSlots []string
replSlots, err = pgm.GetReplicatinSlots()
log.Debug("replication slots", zap.Object("replSlots", replSlots))
if err != nil {
if err = p.updateReplSlots(dbls, followersUIDs); err != nil {
log.Error("err", zap.Error(err))
return
}
// Drop replication slots
for _, slotName := range replSlots {
if !common.IsStolonName(slotName) {
continue
}
if !util.StringInSlice(followersUIDs, common.NameFromStolonName(slotName)) {
log.Info("dropping replication slot since db not marked as follower", zap.String("slot", slotName), zap.String("db", common.NameFromStolonName(slotName)))
if err = pgm.DropReplicationSlot(slotName); err != nil {
log.Error("err", zap.Error(err))
}
}
}
// Create replication slots
for _, followerUID := range followersUIDs {
if followerUID == dbls.UID {
continue
}
replSlot := common.StolonName(followerUID)
if !util.StringInSlice(replSlots, replSlot) {
log.Info("creating replication slot", zap.String("slot", replSlot), zap.String("db", followerUID))
if err = pgm.CreateReplicationSlot(replSlot); err != nil {
log.Error("err", zap.Error(err))
}
}
}

case common.RoleStandby:
// We are a standby
followedUID := db.Spec.FollowConfig.DBUID
log.Info("our db requested role is standby", zap.String("followedDB", followedUID))
followedDB, ok := cd.DBs[followedUID]
if !ok {
log.Error("no db data available for followed db", zap.String("followedDB", followedUID))
var standbySettings *cluster.StandbySettings
switch db.Spec.FollowConfig.Type {
case cluster.FollowTypeInternal:
followedUID := db.Spec.FollowConfig.DBUID
log.Info("our db requested role is standby", zap.String("followedDB", followedUID))
followedDB, ok := cd.DBs[followedUID]
if !ok {
log.Error("no db data available for followed db", zap.String("followedDB", followedUID))
return
}
replConnParams := p.getReplConnParams(db, followedDB)
standbySettings = &cluster.StandbySettings{PrimaryConninfo: replConnParams.ConnString(), PrimarySlotName: common.StolonName(db.UID)}
case cluster.FollowTypeExternal:
standbySettings = db.Spec.FollowConfig.StandbySettings
default:
log.Error("unknown follow type", zap.String("followType", string(db.Spec.FollowConfig.Type)))
return
}
switch localRole {
case common.RoleMaster:
log.Error("cannot move from master role to standby role")
return
case common.RoleStandby:
log.Info("already standby")
if !started {
replConnParams := p.getReplConnParams(db, followedDB)
standbySettings := &cluster.StandbySettings{PrimaryConninfo: replConnParams.ConnString(), PrimarySlotName: common.StolonName(db.UID)}
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(standbySettings, nil)); err != nil {
log.Error("err", zap.Error(err))
return
Expand All @@ -1271,30 +1301,79 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
// TODO(sgotti) Check that the followed instance has all the needed WAL segments

// Update our primary_conninfo if replConnString changed
var curReplConnParams postgresql.ConnParams
if db.Spec.FollowConfig.Type == cluster.FollowTypeInternal {
var curReplConnParams postgresql.ConnParams

curReplConnParams, err = pgm.GetPrimaryConninfo()
if err != nil {
log.Error("err", zap.Error(err))
return
curReplConnParams, err = pgm.GetPrimaryConninfo()
if err != nil {
log.Error("err", zap.Error(err))
return
}
log.Debug("curReplConnParams", zap.Object("curReplConnParams", curReplConnParams))

followedUID := db.Spec.FollowConfig.DBUID
followedDB, ok := cd.DBs[followedUID]
if !ok {
log.Error("no db data available for followed db", zap.String("followedDB", followedUID))
return
}
newReplConnParams := p.getReplConnParams(db, followedDB)
log.Debug("newReplConnParams", zap.Object("newReplConnParams", newReplConnParams))

if !curReplConnParams.Equals(newReplConnParams) {
log.Info("connection parameters changed. Reconfiguring.", zap.String("followedDB", followedUID), zap.Object("replConnParams", newReplConnParams))
standbySettings := &cluster.StandbySettings{PrimaryConninfo: newReplConnParams.ConnString(), PrimarySlotName: common.StolonName(db.UID)}
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(standbySettings, nil)); err != nil {
log.Error("err", zap.Error(err))
return
}
if err = pgm.Restart(true); err != nil {
log.Error("failed to restart postgres instance", zap.Error(err))
return
}
}
}
log.Debug("curReplConnParams", zap.Object("curReplConnParams", curReplConnParams))

newReplConnParams := p.getReplConnParams(db, followedDB)
log.Debug("newReplConnParams", zap.Object("newReplConnParams", newReplConnParams))
if db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
// Update recovery conf if our FollowConfig has changed
curReplConnParams, err := pgm.GetPrimaryConninfo()
if err != nil {
log.Error("err", zap.Error(err))
return
}
log.Debug("curReplConnParams", zap.Object("curReplConnParams", curReplConnParams))

if !curReplConnParams.Equals(newReplConnParams) {
log.Info("connection parameters changed. Reconfiguring.", zap.String("followedDB", followedUID), zap.Object("replConnParams", newReplConnParams))
standbySettings := &cluster.StandbySettings{PrimaryConninfo: newReplConnParams.ConnString(), PrimarySlotName: common.StolonName(db.UID)}
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(standbySettings, nil)); err != nil {
newReplConnParams, err := pg.ParseConnString(db.Spec.FollowConfig.StandbySettings.PrimaryConninfo)
if err != nil {
log.Error("err", zap.Error(err))
return
}
if err = pgm.Restart(true); err != nil {
log.Debug("newReplConnParams", zap.Object("newReplConnParams", newReplConnParams))

curPrimarySlotName, err := pgm.GetPrimarySlotName()
if err != nil {
log.Error("err", zap.Error(err))
return
}

if !curReplConnParams.Equals(newReplConnParams) || curPrimarySlotName != db.Spec.FollowConfig.StandbySettings.PrimarySlotName {
standbySettings := db.Spec.FollowConfig.StandbySettings
if err = pgm.WriteRecoveryConf(p.createRecoveryParameters(standbySettings, nil)); err != nil {
log.Error("err", zap.Error(err))
return
}
if err = pgm.Restart(true); err != nil {
log.Error("failed to restart postgres instance", zap.Error(err))
return
}
}

if err = p.updateReplSlots(dbls, followersUIDs); err != nil {
log.Error("err", zap.Error(err))
return
}
}

case common.RoleUndefined:
log.Info("our db role is none")
return
Expand Down
Loading