From 2cdee7ca147372e820e4d678a62b7cbc34e225f2 Mon Sep 17 00:00:00 2001 From: avallete Date: Wed, 20 Nov 2024 14:03:22 +0100 Subject: [PATCH 1/4] fix(reset): ensure _supabase connections disconnect before reset Closes #2903 --- internal/db/reset/reset.go | 20 +++++++++++--------- internal/db/reset/reset_test.go | 7 +++++++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/internal/db/reset/reset.go b/internal/db/reset/reset.go index a9d2f4f4f..933405117 100644 --- a/internal/db/reset/reset.go +++ b/internal/db/reset/reset.go @@ -166,16 +166,18 @@ func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) err func DisconnectClients(ctx context.Context, conn *pgx.Conn) error { // Must be executed separately because running in transaction is unsupported - disconn := "ALTER DATABASE postgres ALLOW_CONNECTIONS false;" - if _, err := conn.Exec(ctx, disconn); err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { - return errors.Errorf("failed to disconnect clients: %w", err) + for _, dbName := range []string{"postgres", "_supabase"} { + disconn := fmt.Sprintf("ALTER DATABASE %s ALLOW_CONNECTIONS false;", dbName) + if _, err := conn.Exec(ctx, disconn); err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { + return errors.Errorf("failed to disconnect clients from %s: %w", dbName, err) + } + } + term := fmt.Sprintf(utils.TerminateDbSqlFmt, dbName) + if _, err := conn.Exec(ctx, term); err != nil { + return errors.Errorf("failed to terminate backend for %s: %w", dbName, err) } - } - term := fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres") - if _, err := conn.Exec(ctx, term); err != nil { - return errors.Errorf("failed to terminate backend: %w", err) } return nil } diff --git a/internal/db/reset/reset_test.go b/internal/db/reset/reset_test.go index c65aa0ef9..9e5c0721d 100644 --- a/internal/db/reset/reset_test.go +++ b/internal/db/reset/reset_test.go @@ -206,6 +206,9 @@ func TestRecreateDatabase(t *testing.T) { Reply("ALTER DATABASE"). Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + Reply("ALTER DATABASE"). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). Reply("DROP DATABASE"). Query("CREATE DATABASE postgres WITH OWNER postgres"). @@ -260,6 +263,10 @@ func TestRecreateDatabase(t *testing.T) { Reply("ALTER DATABASE"). Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + Reply("ALTER DATABASE"). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + Reply("DO"). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). ReplyError(pgerrcode.ObjectInUse, `database "postgres" is used by an active logical replication slot`). Query("CREATE DATABASE postgres WITH OWNER postgres"). From 48cea100973c801cc3a6267ce29564c0e51d27e1 Mon Sep 17 00:00:00 2001 From: avallete Date: Wed, 20 Nov 2024 14:15:43 +0100 Subject: [PATCH 2/4] fix: switch tests --- internal/db/branch/switch_/switch__test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/internal/db/branch/switch_/switch__test.go b/internal/db/branch/switch_/switch__test.go index 429606627..bc832d4a6 100644 --- a/internal/db/branch/switch_/switch__test.go +++ b/internal/db/branch/switch_/switch__test.go @@ -46,6 +46,10 @@ func TestSwitchCommand(t *testing.T) { Reply("ALTER DATABASE"). Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + Reply("ALTER DATABASE"). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE " + branch + " RENAME TO postgres;"). @@ -238,6 +242,10 @@ func TestSwitchDatabase(t *testing.T) { Reply("ALTER DATABASE"). Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + Reply("ALTER DATABASE"). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). ReplyError(pgerrcode.DuplicateDatabase, `database "main" already exists`) // Setup mock docker @@ -264,6 +272,10 @@ func TestSwitchDatabase(t *testing.T) { Reply("ALTER DATABASE"). Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + Reply("ALTER DATABASE"). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE target RENAME TO postgres;"). From 210ec70ecb4f00b7a6197be33d0de95dffa3477f Mon Sep 17 00:00:00 2001 From: avallete Date: Wed, 20 Nov 2024 15:07:40 +0100 Subject: [PATCH 3/4] fix: refactor reset for multiples db --- internal/db/branch/switch_/switch__test.go | 32 +++++++--------------- internal/db/reset/reset.go | 20 ++++++-------- internal/db/reset/reset_test.go | 20 +++++--------- internal/utils/misc.go | 5 ++-- 4 files changed, 29 insertions(+), 48 deletions(-) diff --git a/internal/db/branch/switch_/switch__test.go b/internal/db/branch/switch_/switch__test.go index bc832d4a6..839a1fb99 100644 --- a/internal/db/branch/switch_/switch__test.go +++ b/internal/db/branch/switch_/switch__test.go @@ -42,13 +42,9 @@ func TestSwitchCommand(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). - Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). @@ -222,7 +218,7 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) // Run test err := switchDatabase(context.Background(), "main", "target", conn.Intercept) @@ -238,13 +234,9 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). - Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). ReplyError(pgerrcode.DuplicateDatabase, `database "main" already exists`) @@ -268,13 +260,9 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). - Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). Reply("DO"). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). diff --git a/internal/db/reset/reset.go b/internal/db/reset/reset.go index 933405117..3cab54135 100644 --- a/internal/db/reset/reset.go +++ b/internal/db/reset/reset.go @@ -166,19 +166,17 @@ func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) err func DisconnectClients(ctx context.Context, conn *pgx.Conn) error { // Must be executed separately because running in transaction is unsupported - for _, dbName := range []string{"postgres", "_supabase"} { - disconn := fmt.Sprintf("ALTER DATABASE %s ALLOW_CONNECTIONS false;", dbName) - if _, err := conn.Exec(ctx, disconn); err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { - return errors.Errorf("failed to disconnect clients from %s: %w", dbName, err) - } - } - term := fmt.Sprintf(utils.TerminateDbSqlFmt, dbName) - if _, err := conn.Exec(ctx, term); err != nil { - return errors.Errorf("failed to terminate backend for %s: %w", dbName, err) + disconn := "ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;" + if _, err := conn.Exec(ctx, disconn); err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { + return errors.Errorf("failed to disconnect clients: %w", err) } } + term := fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase") + if _, err := conn.Exec(ctx, term); err != nil { + return errors.Errorf("failed to terminate backend for %s: %w", "postgres or _supabase", err) + } return nil } diff --git a/internal/db/reset/reset_test.go b/internal/db/reset/reset_test.go index 9e5c0721d..60d381407 100644 --- a/internal/db/reset/reset_test.go +++ b/internal/db/reset/reset_test.go @@ -202,13 +202,10 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). Reply("DO"). - Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). Reply("DROP DATABASE"). Query("CREATE DATABASE postgres WITH OWNER postgres"). @@ -231,9 +228,9 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). ReplyError(pgerrcode.UndefinedTable, `relation "pg_stat_activity" does not exist`) // Run test err := recreateDatabase(context.Background(), conn.Intercept) @@ -246,7 +243,7 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) // Run test err := recreateDatabase(context.Background(), conn.Intercept) @@ -259,13 +256,10 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). + Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). Reply("DO"). - Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "_supabase")). Reply("DO"). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). ReplyError(pgerrcode.ObjectInUse, `database "postgres" is used by an active logical replication slot`). diff --git a/internal/utils/misc.go b/internal/utils/misc.go index 0993ae806..07bf680ec 100644 --- a/internal/utils/misc.go +++ b/internal/utils/misc.go @@ -35,11 +35,12 @@ const ( // https://dba.stackexchange.com/a/11895 // Args: dbname TerminateDbSqlFmt = ` -SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '%[1]s'; +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('%[1]s', '%[2]s'); -- Wait for WAL sender to drop replication slot. DO 'BEGIN WHILE ( - SELECT COUNT(*) FROM pg_replication_slots WHERE database = ''%[1]s'' + SELECT COUNT(*) FROM pg_replication_slots WHERE database IN (''%[1]s'', ''%[2]s'') ) > 0 LOOP END LOOP; END';` + SuggestDebugFlag = "Try rerunning the command with --debug to troubleshoot the error." ) From 2eddca00181853b367065aa73fb48381738d234c Mon Sep 17 00:00:00 2001 From: Qiao Han Date: Mon, 25 Nov 2024 15:36:25 +0800 Subject: [PATCH 4/4] chore: refactor terminate backend query --- internal/db/branch/switch_/switch__test.go | 44 ++++++++++++++-------- internal/db/diff/diff.go | 4 +- internal/db/reset/reset.go | 35 +++++++++++++---- internal/db/reset/reset_test.go | 41 ++++++++++++-------- internal/db/start/start.go | 13 +++++-- internal/utils/misc.go | 13 +------ 6 files changed, 95 insertions(+), 55 deletions(-) diff --git a/internal/db/branch/switch_/switch__test.go b/internal/db/branch/switch_/switch__test.go index 839a1fb99..7c70959ce 100644 --- a/internal/db/branch/switch_/switch__test.go +++ b/internal/db/branch/switch_/switch__test.go @@ -2,7 +2,6 @@ package switch_ import ( "context" - "fmt" "net/http" "os" "path/filepath" @@ -14,6 +13,7 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/supabase/cli/internal/db/reset" "github.com/supabase/cli/internal/testing/apitest" "github.com/supabase/cli/internal/utils" "github.com/supabase/cli/pkg/pgtest" @@ -42,10 +42,14 @@ func TestSwitchCommand(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - Reply("DO"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE " + branch + " RENAME TO postgres;"). @@ -218,8 +222,10 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Query(reset.TERMINATE_BACKENDS) // Run test err := switchDatabase(context.Background(), "main", "target", conn.Intercept) // Check error @@ -234,10 +240,14 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - Reply("DO"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). ReplyError(pgerrcode.DuplicateDatabase, `database "main" already exists`) // Setup mock docker @@ -260,10 +270,14 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - Reply("DO"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE target RENAME TO postgres;"). diff --git a/internal/db/diff/diff.go b/internal/db/diff/diff.go index 38a3bed58..6c5faa892 100644 --- a/internal/db/diff/diff.go +++ b/internal/db/diff/diff.go @@ -146,12 +146,12 @@ func CreateShadowDatabase(ctx context.Context, port uint16) (string, error) { func ConnectShadowDatabase(ctx context.Context, timeout time.Duration, options ...func(*pgx.ConnConfig)) (conn *pgx.Conn, err error) { // Retry until connected, cancelled, or timeout - policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), uint64(timeout.Seconds())) + policy := start.NewBackoffPolicy(ctx, timeout) config := pgconn.Config{Port: utils.Config.Db.ShadowPort} connect := func() (*pgx.Conn, error) { return utils.ConnectLocalPostgres(ctx, config, options...) } - return backoff.RetryWithData(connect, backoff.WithContext(policy, ctx)) + return backoff.RetryWithData(connect, policy) } func MigrateShadowDatabase(ctx context.Context, container string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error { diff --git a/internal/db/reset/reset.go b/internal/db/reset/reset.go index 3cab54135..a475ca80d 100644 --- a/internal/db/reset/reset.go +++ b/internal/db/reset/reset.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" @@ -164,20 +165,40 @@ func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) err return sql.ExecBatch(ctx, conn) } +const ( + TERMINATE_BACKENDS = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')" + COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')" +) + func DisconnectClients(ctx context.Context, conn *pgx.Conn) error { - // Must be executed separately because running in transaction is unsupported - disconn := "ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;" - if _, err := conn.Exec(ctx, disconn); err != nil { + // Must be executed separately because looping in transaction is unsupported + // https://dba.stackexchange.com/a/11895 + disconn := migration.MigrationFile{ + Statements: []string{ + "ALTER DATABASE postgres ALLOW_CONNECTIONS false", + "ALTER DATABASE _supabase ALLOW_CONNECTIONS false", + TERMINATE_BACKENDS, + }, + } + if err := disconn.ExecBatch(ctx, conn); err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { return errors.Errorf("failed to disconnect clients: %w", err) } } - term := fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase") - if _, err := conn.Exec(ctx, term); err != nil { - return errors.Errorf("failed to terminate backend for %s: %w", "postgres or _supabase", err) + // Wait for WAL senders to drop their replication slots + policy := start.NewBackoffPolicy(ctx, 10*time.Second) + waitForDrop := func() error { + var count int + if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil { + err = errors.Errorf("failed to count replication slots: %w", err) + return &backoff.PermanentError{Err: err} + } else if count > 0 { + return errors.Errorf("replication slots still active: %d", count) + } + return nil } - return nil + return backoff.Retry(waitForDrop, policy) } func RestartDatabase(ctx context.Context, w io.Writer) error { diff --git a/internal/db/reset/reset_test.go b/internal/db/reset/reset_test.go index 60d381407..4e3558be3 100644 --- a/internal/db/reset/reset_test.go +++ b/internal/db/reset/reset_test.go @@ -3,7 +3,6 @@ package reset import ( "context" "errors" - "fmt" "io" "net/http" "path/filepath" @@ -202,10 +201,14 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). Reply("DROP DATABASE"). Query("CREATE DATABASE postgres WITH OWNER postgres"). @@ -228,14 +231,17 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - ReplyError(pgerrcode.UndefinedTable, `relation "pg_stat_activity" does not exist`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidCatalogName, `database "_supabase" does not exist`). + Query(TERMINATE_BACKENDS). + Query(COUNT_REPLICATION_SLOTS). + ReplyError(pgerrcode.UndefinedTable, `relation "pg_replication_slots" does not exist`) // Run test err := recreateDatabase(context.Background(), conn.Intercept) // Check error - assert.ErrorContains(t, err, `ERROR: relation "pg_stat_activity" does not exist (SQLSTATE 42P01)`) + assert.ErrorContains(t, err, `ERROR: relation "pg_replication_slots" does not exist (SQLSTATE 42P01)`) }) t.Run("throws error on failure to disconnect", func(t *testing.T) { @@ -243,8 +249,10 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Query(TERMINATE_BACKENDS) // Run test err := recreateDatabase(context.Background(), conn.Intercept) // Check error @@ -256,11 +264,14 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false; ALTER DATABASE _supabase ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres", "_supabase")). - Reply("DO"). - Reply("DO"). + Query(TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). ReplyError(pgerrcode.ObjectInUse, `database "postgres" is used by an active logical replication slot`). Query("CREATE DATABASE postgres WITH OWNER postgres"). diff --git a/internal/db/start/start.go b/internal/db/start/start.go index a300f5594..f0fc488d2 100644 --- a/internal/db/start/start.go +++ b/internal/db/start/start.go @@ -160,6 +160,14 @@ EOF`} return initCurrentBranch(fsys) } +func NewBackoffPolicy(ctx context.Context, timeout time.Duration) backoff.BackOff { + policy := backoff.WithMaxRetries( + backoff.NewConstantBackOff(time.Second), + uint64(timeout.Seconds()), + ) + return backoff.WithContext(policy, ctx) +} + func WaitForHealthyService(ctx context.Context, timeout time.Duration, started ...string) error { probe := func() error { var errHealth []error @@ -173,10 +181,7 @@ func WaitForHealthyService(ctx context.Context, timeout time.Duration, started . started = unhealthy return errors.Join(errHealth...) } - policy := backoff.WithContext(backoff.WithMaxRetries( - backoff.NewConstantBackOff(time.Second), - uint64(timeout.Seconds()), - ), ctx) + policy := NewBackoffPolicy(ctx, timeout) err := backoff.Retry(probe, policy) if err != nil && !errors.Is(err, context.Canceled) { // Print container logs for easier debugging diff --git a/internal/utils/misc.go b/internal/utils/misc.go index 07bf680ec..202d80d0a 100644 --- a/internal/utils/misc.go +++ b/internal/utils/misc.go @@ -31,18 +31,7 @@ func ShortContainerImageName(imageName string) string { return matches[1] } -const ( - // https://dba.stackexchange.com/a/11895 - // Args: dbname - TerminateDbSqlFmt = ` -SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('%[1]s', '%[2]s'); --- Wait for WAL sender to drop replication slot. -DO 'BEGIN WHILE ( - SELECT COUNT(*) FROM pg_replication_slots WHERE database IN (''%[1]s'', ''%[2]s'') -) > 0 LOOP END LOOP; END';` - - SuggestDebugFlag = "Try rerunning the command with --debug to troubleshoot the error." -) +const SuggestDebugFlag = "Try rerunning the command with --debug to troubleshoot the error." var ( CmdSuggestion string