Skip to content

Commit

Permalink
keeper: improve db start logic
Browse files Browse the repository at this point in the history
Unfortunately pg_ctl start changed behavior in postgresql 10:

pg_ctl for postgres < 10 with -w will returns 0 also if the instance isn't ready
to accept connections.

Whle in postgres >= 10 it will return a non 0 exit code (1 like when the
instance fails to start) making it impossible to distinguish between problems
starting an instance (i.e. wrong parameters) or an instance started but not
ready to accept connections.

To work with all the versions and since we want to distinguish between a failed
start and a started but not ready instance we are forced to not use pg_ctl and
write part of its logic here (I hate to do this).

Now PGManager.start method directly starts the instance using the "postgres"
executable and waits 1 minute to see if the process starts (checking its pid file)
or exits due to an error. It doesn't check if the instance is ready to accept
connections. When needed this is done inside the keeper calling WaitReady.
  • Loading branch information
sgotti committed Jan 9, 2018
1 parent b66be00 commit da6ba1a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 22 deletions.
22 changes: 19 additions & 3 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
pgParameters, err = pgm.GetConfigFilePGParameters()
if err != nil {
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
Expand All @@ -1018,6 +1022,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
}

log.Infow("setting roles")
Expand Down Expand Up @@ -1083,7 +1091,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
// wait for the db having replyed all the wals
if err = pgm.WaitReady(cd.Cluster.DefSpec().SyncTimeout.Duration); err != nil {
log.Errorw("instance not ready", zap.Error(err))
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}

Expand Down Expand Up @@ -1179,7 +1187,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}
if err = pgm.Start(); err != nil {
log.Errorw("err", zap.Error(err))
log.Errorw("failed to start instance", zap.Error(err))
return
}
started = true
Expand All @@ -1188,7 +1196,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
fullResync := false
// if not accepting connection assume that it's blocked waiting for missing wal
// (see above TODO), so do a full resync using pg_basebackup.
if err = pgm.Ping(); err != nil {
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("pg_rewinded standby is not accepting connection. it's probably waiting for unavailable wals. Forcing a full resync")
fullResync = true
} else {
Expand Down Expand Up @@ -1250,6 +1258,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
pgParameters, err = pgm.GetConfigFilePGParameters()
if err != nil {
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
Expand All @@ -1263,6 +1275,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
}
log.Infow("updating our db UID with the cluster data provided db UID")
// replace our current db uid with the required one.
Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultProxyCheckInterval = 5 * time.Second
DefaultProxyTimeoutInterval = 15 * time.Second

DefaultDBWaitReadyTimeout = 60 * time.Second

DefaultDBNotIncreasingXLogPosTimes = 10

DefaultSleepInterval = 5 * time.Second
Expand Down
98 changes: 79 additions & 19 deletions pkg/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -38,6 +39,8 @@ import (
const (
postgresConf = "postgresql.conf"
tmpPostgresConf = "stolon-temp-postgresql.conf"

startTimeout = 60 * time.Second
)

var log = slog.S()
Expand Down Expand Up @@ -156,7 +159,7 @@ func (p *Manager) Init(initConfig *InitConfig) error {
cmd.Args = append(cmd.Args, "--data-checksums")
}

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Run(); err != nil {
Expand All @@ -183,7 +186,7 @@ func (p *Manager) Restore(command string) error {
cmd = exec.Command("/bin/sh", "-c", command)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Run(); err != nil {
Expand All @@ -199,7 +202,7 @@ out:
return nil
}

func (p *Manager) StartTmpMerged(args ...string) error {
func (p *Manager) StartTmpMerged() error {
// start postgres with a conf file different then postgresql.conf so we don't have to touch it
f, err := os.Create(filepath.Join(p.dataDir, tmpPostgresConf))
if err != nil {
Expand Down Expand Up @@ -231,7 +234,7 @@ func (p *Manager) StartTmpMerged(args ...string) error {
if err := p.writePgHba(); err != nil {
return fmt.Errorf("error writing conf file: %v", err)
}
return p.start("-o", fmt.Sprintf("-c config_file=%s", f.Name()))
return p.start("-c", fmt.Sprintf("config_file=%s", f.Name()))
}

func (p *Manager) Start() error {
Expand All @@ -244,26 +247,83 @@ func (p *Manager) Start() error {
return p.start()
}

// start starts the instance. A success means that the instance has been
// successfully started BUT doesn't mean that the instance is ready to accept
// connections (i.e. it's waiting for some missing wals etc...).
// Note that also on error an instance may still be active and, if needed,
// should be manually stopped calling Stop.
func (p *Manager) start(args ...string) error {
// pg_ctl for postgres < 10 with -w will exit after the timeout and return 0
// also if the instance isn't ready to accept connections, while for
// postgres >= 10 it will return a non 0 exit code making it impossible to
// distinguish between problems starting an instance (i.e. wrong parameters)
// or an instance started but not ready to accept connections.
// To work with all the versions and since we want to distinguish between a
// failed start and a started but not ready instance we are forced to not
// use pg_ctl and write part of its logic here (I hate to do this).

// A difference between directly calling postgres instead of pg_ctl is that
// the instance parent is the keeper instead of the defined system reaper
// (since pg_ctl forks and then exits leaving the postmaster orphaned).

log.Infow("starting database")
name := filepath.Join(p.pgBinPath, "pg_ctl")
args = append([]string{"start", "-w", "--timeout", "60", "-D", p.dataDir, "-o", "-c unix_socket_directories=" + common.PgUnixSocketDirectories}, args...)
name := filepath.Join(p.pgBinPath, "postgres")
args = append([]string{"-D", p.dataDir, "-c", "unix_socket_directories=" + common.PgUnixSocketDirectories}, args...)
cmd := exec.Command(name, args...)
log.Debugw("execing cmd", "cmd", cmd)
//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {

if err := cmd.Start(); err != nil {
return fmt.Errorf("error: %v", err)
}

// execute child wait in a goroutine so we'll wait for it to exit without
// leaving zombie childs
exited := make(chan struct{})
go func() {
cmd.Wait()
close(exited)
}()

pid := cmd.Process.Pid

// Wait for the correct pid file to appear or for the process to exit
ok := false
start := time.Now()
for time.Now().Add(-startTimeout).Before(start) {
fh, err := os.Open(filepath.Join(p.dataDir, "postmaster.pid"))
if err == nil {
scanner := bufio.NewScanner(fh)
scanner.Split(bufio.ScanLines)
if scanner.Scan() {
fpid := scanner.Text()
if fpid == strconv.Itoa(pid) {
ok = true
fh.Close()
break
}
}
}
fh.Close()

select {
case <-exited:
return fmt.Errorf("postgres exited unexpectedly")
default:
}

time.Sleep(200 * time.Millisecond)
}

if !ok {
return fmt.Errorf("instance still starting")
}

p.UpdateCurParameters()
p.UpdateCurHba()

// pg_ctl with -w will exit after the timeout and return 0 also if the
// instance isn't accepting connection because already in recovery (usually
// waiting for wals during a pitr or a pg_rewind)
// so a start doesn't mean the instance is ready.
return nil
}

Expand All @@ -276,7 +336,7 @@ func (p *Manager) Stop(fast bool) error {
}
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand All @@ -287,7 +347,7 @@ func (p *Manager) Stop(fast bool) error {

func (p *Manager) IsStarted() (bool, error) {
name := filepath.Join(p.pgBinPath, "pg_ctl")
cmd := exec.Command(name, "status", "-w", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
cmd := exec.Command(name, "status", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
_, err := cmd.CombinedOutput()
if err != nil {
if _, ok := err.(*exec.ExitError); ok {
Expand All @@ -313,7 +373,7 @@ func (p *Manager) Reload() error {
cmd := exec.Command(name, "reload", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -343,7 +403,7 @@ func (p *Manager) WaitReady(timeout time.Duration) error {
if err := p.Ping(); err == nil {
return nil
}
time.Sleep(1 * time.Second)
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for db ready")
}
Expand All @@ -354,7 +414,7 @@ func (p *Manager) Promote() error {
cmd := exec.Command(name, "promote", "-w", "-D", p.dataDir)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -707,7 +767,7 @@ func (p *Manager) SyncFromFollowedPGRewind(followedConnParams ConnParams, passwo
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -748,7 +808,7 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams) error {
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down

0 comments on commit da6ba1a

Please sign in to comment.