From 5fe441c1beb4d78b590580774fe401951e0e234d Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 13 Jun 2018 14:54:59 +0200 Subject: [PATCH] *: overcome postgres sync repl limit causing lost transactions under some events. Postgres synchronous replication has a downside explained in the docs: https://www.postgresql.org/docs/current/static/warm-standby.html `If primary restarts while commits are waiting for acknowledgement, those waiting transactions will be marked fully committed once the primary database recovers. There is no way to be certain that all standbys have received all outstanding WAL data at time of the crash of the primary. Some transactions may not show as committed on the standby, even though they show as committed on the primary. The guarantee we offer is that the application will not receive explicit acknowledgement of the successful commit of a transaction until the WAL data is known to be safely received by all the synchronous standbys.` Under some events this will cause lost transactions. For example: * Sync standby goes down. * A client commits a transaction, it blocks waiting for acknowledgement. * Primary restart, it'll mark the above transaction as fully committed. All the clients will now see that transaction. * Primary dies * Standby comes back. * The sentinel will elect the standby as the new master since it's in the synchronous_standby_names list. * The above transaction will be lost despite synchronous replication being enabled. So there can be some conditions where a syncstandby could be elected also if it's missing the last transactions if it was down at the commit time. It's not easy to fix this issue since these events cannot be resolved by the sentinel because it's not possible to know if a sync standby is really in sync when the master is down (since we cannot query its last wal position and the reporting from the keeper is asynchronous). But with stolon we have the power to overcome this issue by noticing when a primary restarts (since we control it), allow only "internal" connections until all the defined synchronous standbys are really in sync. Allowing only "internal" connections means not adding the default rules or the user defined pgHBA rules but only the rules needed for replication (and local communication from the keeper). Since "internal" rules accepts the defined superuser and replication users, client should not use these roles for normal operation or the above solution won't work (but they shouldn't do it anyway since this could cause exhaustion of reserved superuser connections needed by the keeper to check the instance). --- cmd/keeper/cmd/keeper.go | 70 +++++++++++---- doc/faq.md | 2 +- doc/syncrepl.md | 29 +++++++ internal/postgresql/postgresql.go | 6 ++ internal/postgresql/utils.go | 28 ++++++ tests/integration/ha_test.go | 136 +++++++++++++++++++++++++++++- tests/integration/utils.go | 7 +- 7 files changed, 256 insertions(+), 22 deletions(-) diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index 36ecd07e3..f289ccb67 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -444,6 +444,8 @@ type PostgresKeeper struct { pgStateMutex sync.Mutex getPGStateMutex sync.Mutex lastPGState *cluster.PostgresState + + waitSyncStandbysSynced bool } func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { @@ -1019,8 +1021,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { return } - // Dynamicly generate hba auth from clusterData - pgm.SetHba(p.generateHBA(cd, db)) + // Generate hba auth from clusterData + pgm.SetHba(p.generateHBA(cd, db, false)) var pgParameters common.Parameters @@ -1436,6 +1438,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { return } if !started { + // if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state. + if db.Spec.SynchronousReplication { + p.waitSyncStandbysSynced = true + log.Infow("not allowing connection as normal users since synchronous replication is enabled and instance was down") + pgm.SetHba(p.generateHBA(cd, db, p.waitSyncStandbysSynced)) + } + if err = pgm.Start(); err != nil { log.Errorw("failed to start postgres", zap.Error(err)) return @@ -1620,8 +1629,31 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Infow("postgres parameters not changed") } - // Dynamicly generate hba auth from clusterData - newHBA := p.generateHBA(cd, db) + // Generate hba auth from clusterData + + // if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state. + if db.Spec.SynchronousReplication && p.waitSyncStandbysSynced { + curSyncStandbysFullName, err := pgm.GetSyncStandbys() + if err != nil { + log.Errorw("failed to retrieve current sync standbys status from instance", zap.Error(err)) + return + } + + curSyncStandbys := []string{} + for _, s := range curSyncStandbysFullName { + if common.IsStolonName(s) { + curSyncStandbys = append(curSyncStandbys, common.NameFromStolonName(s)) + } + } + if !util.CompareStringSliceNoOrder(curSyncStandbys, db.Spec.SynchronousStandbys) { + log.Infow("not allowing connection as normal users since synchronous replication is enabled, instance was down and not all sync standbys are synced") + } else { + p.waitSyncStandbysSynced = false + } + } else { + p.waitSyncStandbysSynced = false + } + newHBA := p.generateHBA(cd, db, p.waitSyncStandbysSynced) if !reflect.DeepEqual(newHBA, pgm.CurHba()) { log.Infow("postgres hba entries changed, reloading postgres instance") pgm.SetHba(newHBA) @@ -1726,8 +1758,12 @@ func IsMaster(db *cluster.DB) bool { } } -// generateHBA generates the instance hba entries depending on the value of DefaultSUReplAccessMode. -func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) []string { +// generateHBA generates the instance hba entries depending on the value of +// DefaultSUReplAccessMode. +// When onlyInternal is true only rules needed for replication will be setup +// and the traffic should be permitted only for pgSUUsername standard +// connections and pgReplUsername replication connections. +func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB, onlyInternal bool) []string { // Minimal entries for local normal and replication connections needed by the stolon keeper // Matched local connections are for postgres database and suUsername user with md5 auth // Matched local replication connections are for replUsername user with md5 auth @@ -1761,16 +1797,18 @@ func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) [] } } - // By default, if no custom pg_hba entries are provided, accept - // connections for all databases and users with md5 auth - if db.Spec.PGHBA != nil { - computedHBA = append(computedHBA, db.Spec.PGHBA...) - } else { - computedHBA = append( - computedHBA, - "host all all 0.0.0.0/0 md5", - "host all all ::0/0 md5", - ) + if !onlyInternal { + // By default, if no custom pg_hba entries are provided, accept + // connections for all databases and users with md5 auth + if db.Spec.PGHBA != nil { + computedHBA = append(computedHBA, db.Spec.PGHBA...) + } else { + computedHBA = append( + computedHBA, + "host all all 0.0.0.0/0 md5", + "host all all ::0/0 md5", + ) + } } // return generated Hba merged with user Hba diff --git a/doc/faq.md b/doc/faq.md index 287c46ce5..b3cc6c263 100644 --- a/doc/faq.md +++ b/doc/faq.md @@ -58,7 +58,7 @@ stolon let you easily integrate with any backup/restore solution. See the [point When using async replication the leader sentinel tries to find the best standby using a valid standby with the (last reported) nearest xlog location to the master latest knows xlog location. If a master is down there's no way to know its latest xlog position (stolon get and save it at some intervals) so there's no way to guarantee that the standby is not behind but just that the best standby of the ones available will be choosen. -When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. We are thinking of a way to avoid this using stolon). +When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. This is "fixed" by stolon. See the [synchronous replication doc](syncrepl.md). ## Does stolon uses postgres sync replication [quorum methods](https://www.postgresql.org/docs/10/static/runtime-config-replication.html#RUNTIME-CONFIG-REPLICATION-MASTER) (FIRST or ANY)? diff --git a/doc/syncrepl.md b/doc/syncrepl.md index 2c821eb32..275c046a1 100644 --- a/doc/syncrepl.md +++ b/doc/syncrepl.md @@ -29,3 +29,32 @@ Set MinSynchronousStandbys/MaxSynchronousStandbys to a value different than 1 on ``` stolonctl --cluster-name=mycluster --store-backend=etcd update --patch '{ "synchronousReplication" : true, "minSynchronousStandbys": 2, "maxSynchronousStandbys": 3 }' ``` + +## Handling postgresql sync repl limits under such circumstances + +Postgres synchronous replication has a downside explained in the [docs](https://www.postgresql.org/docs/current/static/warm-standby.html) + +`If primary restarts while commits are waiting for acknowledgement, those waiting transactions will be marked fully committed once the primary database recovers. There is no way to be certain that all standbys have received all outstanding WAL data at time of the crash of the primary. Some transactions may not show as committed on the standby, even though they show as committed on the primary. The guarantee we offer is that the application will not receive explicit acknowledgement of the successful commit of a transaction until the WAL data is known to be safely received by all the synchronous standbys.` + +Under some events this will cause lost transactions. For example: + +* Sync standby goes down. +* A client commits a transaction, it blocks waiting for acknowledgement. +* Primary restart, it'll mark the above transaction as fully committed. All the +clients will now see that transaction. +* Primary dies +* Standby comes back. +* The sentinel will elect the standby as the new master since it's in the +synchronous_standby_names list. +* The above transaction will be lost despite synchronous replication being +enabled. + +So there can be some conditions where a syncstandby could be elected also if it's missing the last transactions if it was down at the commit time. + +It's not easy to fix this issue since these events cannot be resolved by the sentinel because it's not possible to know if a sync standby is really in sync when the master is down (since we cannot query its last wal position and the reporting from the keeper is asynchronous). + +But with stolon we have the power to overcome this issue by noticing when a primary restarts (since we control it), allow only "internal" connections until all the defined synchronous standbys are really in sync. + +Allowing only "internal" connections means not adding the default rules or the user defined pgHBA rules but only the rules needed for replication (and local communication from the keeper). + +Since "internal" rules accepts the defined superuser and replication users, client should not use these roles for normal operation or the above solution won't work (but they shouldn't do it anyway since this could cause exhaustion of reserved superuser connections needed by the keeper to check the instance). diff --git a/internal/postgresql/postgresql.go b/internal/postgresql/postgresql.go index b9ad7d401..9dbe31790 100644 --- a/internal/postgresql/postgresql.go +++ b/internal/postgresql/postgresql.go @@ -532,6 +532,12 @@ func (p *Manager) SetupRoles() error { return nil } +func (p *Manager) GetSyncStandbys() ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), p.requestTimeout) + defer cancel() + return getSyncStandbys(ctx, p.localConnParams) +} + func (p *Manager) GetReplicationSlots() ([]string, error) { maj, _, err := p.PGDataVersion() if err != nil { diff --git a/internal/postgresql/utils.go b/internal/postgresql/utils.go index c045c349c..6153ee018 100644 --- a/internal/postgresql/utils.go +++ b/internal/postgresql/utils.go @@ -231,6 +231,34 @@ func getRole(ctx context.Context, connParams ConnParams) (common.Role, error) { return "", fmt.Errorf("no rows returned") } +func getSyncStandbys(ctx context.Context, connParams ConnParams) ([]string, error) { + db, err := sql.Open("postgres", connParams.ConnString()) + if err != nil { + return nil, err + } + defer db.Close() + + rows, err := query(ctx, db, "select application_name, sync_state from pg_stat_replication") + if err != nil { + return nil, err + } + defer rows.Close() + + syncStandbys := []string{} + for rows.Next() { + var application_name, sync_state string + if err := rows.Scan(&application_name, &sync_state); err != nil { + return nil, err + } + + if sync_state == "sync" { + syncStandbys = append(syncStandbys, application_name) + } + } + + return syncStandbys, nil +} + func PGLsnToInt(lsn string) (uint64, error) { parts := strings.Split(lsn, "/") if len(parts) != 2 { diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index ee8fd1766..d967c5a5b 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -16,10 +16,12 @@ package integration import ( "context" + "database/sql" "fmt" "io/ioutil" "os" "path/filepath" + "strings" "syscall" "testing" "time" @@ -27,6 +29,7 @@ import ( "github.com/satori/go.uuid" "github.com/sorintlab/stolon/internal/cluster" "github.com/sorintlab/stolon/internal/common" + pg "github.com/sorintlab/stolon/internal/postgresql" "github.com/sorintlab/stolon/internal/store" ) @@ -1552,7 +1555,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) { t.Fatalf("unexpected err: %v", err) } - // get current stanbdys[0] db uid + // get current standbys[0] db uid cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) @@ -1638,7 +1641,7 @@ func TestStandbyCantSync(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - // get current stanbdys[0] db uid + // get current standbys[0] db uid cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) @@ -1690,7 +1693,7 @@ func TestStandbyCantSync(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - // check that the current stanbdys[0] db uid is different. This means the + // check that the current standbys[0] db uid is different. This means the // sentinel found that standbys[0] won't sync due to missing wals and asked // the keeper to resync (defining e new db in the cluster data) cd, _, err = sm.GetClusterData(context.TODO()) @@ -1710,3 +1713,130 @@ func TestStandbyCantSync(t *testing.T) { t.Fatalf("expected different dbuid for standbys[0]: got the same: %q", newStandby0DBUID) } } + +// TestSyncStandbyNotInSync tests that, when using synchronous replication, a +// normal user cannot connect to primary db after it has restarted until all +// defined synchronous standbys are in sync. +func TestSyncStandbyNotInSync(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + clusterName := uuid.NewV4().String() + + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, true, false, nil) + defer shutdown(tks, tss, tp, tstore) + + storePath := filepath.Join(common.StorePrefix, clusterName) + sm := store.NewKVBackedStore(tstore.store, storePath) + + master, standbys := waitMasterStandbysReady(t, sm, tks) + standby := standbys[0] + + if err := WaitClusterDataSynchronousStandbys([]string{standby.uid}, sm, 30*time.Second); err != nil { + t.Fatalf("expected synchronous standby on keeper %q in cluster data", standby.uid) + } + + if err := populate(t, master); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := write(t, master, 1, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // create a normal user + if _, err := master.Exec("CREATE USER user01 PASSWORD 'password'"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := master.Exec("GRANT ALL ON DATABASE postgres TO user01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := master.Exec("GRANT ALL ON TABLE table01 TO user01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + connParams := pg.ConnParams{ + "user": "user01", + "password": "password", + "host": master.pgListenAddress, + "port": master.pgPort, + "dbname": "postgres", + "sslmode": "disable", + } + + connString := connParams.ConnString() + user01db, err := sql.Open("postgres", connString) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // get the master XLogPos + xLogPos, err := GetXLogPos(master) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // wait for the keepers to have reported their state + if err := WaitClusterSyncedXLogPos([]*TestKeeper{master, standby}, xLogPos, sm, 20*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // the proxy should connect to the right master + if err := tp.WaitRightMaster(master, 3*cluster.DefaultProxyCheckInterval); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Stop the standby keeper, should also stop the database + t.Logf("Stopping current standby keeper: %s", standby.uid) + standby.Stop() + + // this call will block and then exit with an error when the master is restarted + go func() { + write(t, master, 2, 2) + }() + time.Sleep(1 * time.Second) + + // restart master + t.Logf("Restarting current master keeper: %s", master.uid) + master.Stop() + master.Start() + waitKeeperReady(t, sm, master) + + // The transaction should be fully committed on master + c, err := getLines(t, master) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if c != 2 { + t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) + } + + // The normal user shouldn't be able to connect + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + exp := `pq: no pg_hba.conf entry for host "127.0.0.1", user "user01", database "postgres"` + if !strings.HasPrefix(err.Error(), exp) { + t.Fatalf("expected error when connecting to db as user01 starting with %q, got err: %q", exp, err.Error()) + } + } else { + t.Fatalf("expected error connecting to db as user01, got no err") + } + + // Starting the standby keeper + t.Logf("Starting current standby keeper: %s", standby.uid) + standby.Start() + + time.Sleep(10 * time.Second) + // The normal user should now be able to connect and see 2 lines + c, err = getLines(t, user01db) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if c != 2 { + t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) + } +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index e90a83cbb..ac5dff547 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -72,6 +72,9 @@ func pgParametersWithDefaults(p cluster.PGParameters) cluster.PGParameters { type Querier interface { Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...interface{}) (*sql.Rows, error) +} + +type ReplQuerier interface { ReplQuery(query string, args ...interface{}) (*sql.Rows, error) } @@ -94,7 +97,7 @@ func GetPGParameters(q Querier) (common.Parameters, error) { return pgParameters, nil } -func GetSystemData(q Querier) (*pg.SystemData, error) { +func GetSystemData(q ReplQuerier) (*pg.SystemData, error) { rows, err := q.ReplQuery("IDENTIFY_SYSTEM") if err != nil { return nil, err @@ -116,7 +119,7 @@ func GetSystemData(q Querier) (*pg.SystemData, error) { return nil, fmt.Errorf("query returned 0 rows") } -func GetXLogPos(q Querier) (uint64, error) { +func GetXLogPos(q ReplQuerier) (uint64, error) { // get the current master XLogPos systemData, err := GetSystemData(q) if err != nil {