From da6ba1aa9bf367dc66cb1abe747a1e4a04ddf743 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 9 Jan 2018 13:43:43 +0100 Subject: [PATCH] keeper: improve db start logic Unfortunately pg_ctl start changed behavior in postgresql 10: pg_ctl for postgres < 10 with -w will returns 0 also if the instance isn't ready to accept connections. Whle in postgres >= 10 it will return a non 0 exit code (1 like when the instance fails to start) making it impossible to distinguish between problems starting an instance (i.e. wrong parameters) or an instance started but not ready to accept connections. To work with all the versions and since we want to distinguish between a failed start and a started but not ready instance we are forced to not use pg_ctl and write part of its logic here (I hate to do this). Now PGManager.start method directly starts the instance using the "postgres" executable and waits 1 minute to see if the process starts (checking its pid file) or exits due to an error. It doesn't check if the instance is ready to accept connections. When needed this is done inside the keeper calling WaitReady. --- cmd/keeper/keeper.go | 22 ++++++-- pkg/cluster/cluster.go | 2 + pkg/postgresql/postgresql.go | 98 +++++++++++++++++++++++++++++------- 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/cmd/keeper/keeper.go b/cmd/keeper/keeper.go index 2c106ddae..0dda31cec 100644 --- a/cmd/keeper/keeper.go +++ b/cmd/keeper/keeper.go @@ -1005,6 +1005,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to start instance", zap.Error(err)) return } + if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil { + log.Errorw("timeout waiting for instance to be ready", zap.Error(err)) + return + } pgParameters, err = pgm.GetConfigFilePGParameters() if err != nil { log.Errorw("failed to retrieve postgres parameters", zap.Error(err)) @@ -1018,6 +1022,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to start instance", zap.Error(err)) return } + if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil { + log.Errorw("timeout waiting for instance to be ready", zap.Error(err)) + return + } } log.Infow("setting roles") @@ -1083,7 +1091,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { } // wait for the db having replyed all the wals if err = pgm.WaitReady(cd.Cluster.DefSpec().SyncTimeout.Duration); err != nil { - log.Errorw("instance not ready", zap.Error(err)) + log.Errorw("timeout waiting for instance to be ready", zap.Error(err)) return } @@ -1179,7 +1187,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { return } if err = pgm.Start(); err != nil { - log.Errorw("err", zap.Error(err)) + log.Errorw("failed to start instance", zap.Error(err)) return } started = true @@ -1188,7 +1196,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { fullResync := false // if not accepting connection assume that it's blocked waiting for missing wal // (see above TODO), so do a full resync using pg_basebackup. - if err = pgm.Ping(); err != nil { + if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil { log.Errorw("pg_rewinded standby is not accepting connection. it's probably waiting for unavailable wals. Forcing a full resync") fullResync = true } else { @@ -1250,6 +1258,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to start instance", zap.Error(err)) return } + if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil { + log.Errorw("timeout waiting for instance to be ready", zap.Error(err)) + return + } pgParameters, err = pgm.GetConfigFilePGParameters() if err != nil { log.Errorw("failed to retrieve postgres parameters", zap.Error(err)) @@ -1263,6 +1275,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to start instance", zap.Error(err)) return } + if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil { + log.Errorw("timeout waiting for instance to be ready", zap.Error(err)) + return + } } log.Infow("updating our db UID with the cluster data provided db UID") // replace our current db uid with the required one. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 5a3e06e4b..0094efe83 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -47,6 +47,8 @@ const ( DefaultProxyCheckInterval = 5 * time.Second DefaultProxyTimeoutInterval = 15 * time.Second + DefaultDBWaitReadyTimeout = 60 * time.Second + DefaultDBNotIncreasingXLogPosTimes = 10 DefaultSleepInterval = 5 * time.Second diff --git a/pkg/postgresql/postgresql.go b/pkg/postgresql/postgresql.go index 12d9ac41a..c60284593 100644 --- a/pkg/postgresql/postgresql.go +++ b/pkg/postgresql/postgresql.go @@ -23,6 +23,7 @@ import ( "path/filepath" "regexp" "sort" + "strconv" "strings" "syscall" "time" @@ -38,6 +39,8 @@ import ( const ( postgresConf = "postgresql.conf" tmpPostgresConf = "stolon-temp-postgresql.conf" + + startTimeout = 60 * time.Second ) var log = slog.S() @@ -156,7 +159,7 @@ func (p *Manager) Init(initConfig *InitConfig) error { cmd.Args = append(cmd.Args, "--data-checksums") } - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err = cmd.Run(); err != nil { @@ -183,7 +186,7 @@ func (p *Manager) Restore(command string) error { cmd = exec.Command("/bin/sh", "-c", command) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err = cmd.Run(); err != nil { @@ -199,7 +202,7 @@ out: return nil } -func (p *Manager) StartTmpMerged(args ...string) error { +func (p *Manager) StartTmpMerged() error { // start postgres with a conf file different then postgresql.conf so we don't have to touch it f, err := os.Create(filepath.Join(p.dataDir, tmpPostgresConf)) if err != nil { @@ -231,7 +234,7 @@ func (p *Manager) StartTmpMerged(args ...string) error { if err := p.writePgHba(); err != nil { return fmt.Errorf("error writing conf file: %v", err) } - return p.start("-o", fmt.Sprintf("-c config_file=%s", f.Name())) + return p.start("-c", fmt.Sprintf("config_file=%s", f.Name())) } func (p *Manager) Start() error { @@ -244,26 +247,83 @@ func (p *Manager) Start() error { return p.start() } +// start starts the instance. A success means that the instance has been +// successfully started BUT doesn't mean that the instance is ready to accept +// connections (i.e. it's waiting for some missing wals etc...). +// Note that also on error an instance may still be active and, if needed, +// should be manually stopped calling Stop. func (p *Manager) start(args ...string) error { + // pg_ctl for postgres < 10 with -w will exit after the timeout and return 0 + // also if the instance isn't ready to accept connections, while for + // postgres >= 10 it will return a non 0 exit code making it impossible to + // distinguish between problems starting an instance (i.e. wrong parameters) + // or an instance started but not ready to accept connections. + // To work with all the versions and since we want to distinguish between a + // failed start and a started but not ready instance we are forced to not + // use pg_ctl and write part of its logic here (I hate to do this). + + // A difference between directly calling postgres instead of pg_ctl is that + // the instance parent is the keeper instead of the defined system reaper + // (since pg_ctl forks and then exits leaving the postmaster orphaned). + log.Infow("starting database") - name := filepath.Join(p.pgBinPath, "pg_ctl") - args = append([]string{"start", "-w", "--timeout", "60", "-D", p.dataDir, "-o", "-c unix_socket_directories=" + common.PgUnixSocketDirectories}, args...) + name := filepath.Join(p.pgBinPath, "postgres") + args = append([]string{"-D", p.dataDir, "-c", "unix_socket_directories=" + common.PgUnixSocketDirectories}, args...) cmd := exec.Command(name, args...) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { + + if err := cmd.Start(); err != nil { return fmt.Errorf("error: %v", err) } + // execute child wait in a goroutine so we'll wait for it to exit without + // leaving zombie childs + exited := make(chan struct{}) + go func() { + cmd.Wait() + close(exited) + }() + + pid := cmd.Process.Pid + + // Wait for the correct pid file to appear or for the process to exit + ok := false + start := time.Now() + for time.Now().Add(-startTimeout).Before(start) { + fh, err := os.Open(filepath.Join(p.dataDir, "postmaster.pid")) + if err == nil { + scanner := bufio.NewScanner(fh) + scanner.Split(bufio.ScanLines) + if scanner.Scan() { + fpid := scanner.Text() + if fpid == strconv.Itoa(pid) { + ok = true + fh.Close() + break + } + } + } + fh.Close() + + select { + case <-exited: + return fmt.Errorf("postgres exited unexpectedly") + default: + } + + time.Sleep(200 * time.Millisecond) + } + + if !ok { + return fmt.Errorf("instance still starting") + } + p.UpdateCurParameters() p.UpdateCurHba() - // pg_ctl with -w will exit after the timeout and return 0 also if the - // instance isn't accepting connection because already in recovery (usually - // waiting for wals during a pitr or a pg_rewind) - // so a start doesn't mean the instance is ready. return nil } @@ -276,7 +336,7 @@ func (p *Manager) Stop(fast bool) error { } log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil { @@ -287,7 +347,7 @@ func (p *Manager) Stop(fast bool) error { func (p *Manager) IsStarted() (bool, error) { name := filepath.Join(p.pgBinPath, "pg_ctl") - cmd := exec.Command(name, "status", "-w", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories) + cmd := exec.Command(name, "status", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories) _, err := cmd.CombinedOutput() if err != nil { if _, ok := err.(*exec.ExitError); ok { @@ -313,7 +373,7 @@ func (p *Manager) Reload() error { cmd := exec.Command(name, "reload", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil { @@ -343,7 +403,7 @@ func (p *Manager) WaitReady(timeout time.Duration) error { if err := p.Ping(); err == nil { return nil } - time.Sleep(1 * time.Second) + time.Sleep(200 * time.Millisecond) } return fmt.Errorf("timeout waiting for db ready") } @@ -354,7 +414,7 @@ func (p *Manager) Promote() error { cmd := exec.Command(name, "promote", "-w", "-D", p.dataDir) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil { @@ -707,7 +767,7 @@ func (p *Manager) SyncFromFollowedPGRewind(followedConnParams ConnParams, passwo cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name())) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil { @@ -748,7 +808,7 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams) error { cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name())) log.Debugw("execing cmd", "cmd", cmd) - //Pipe command's std[err|out] to parent. + // Pipe command's std[err|out] to parent. cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil {