diff --git a/internal/db/branch/switch_/switch__test.go b/internal/db/branch/switch_/switch__test.go index 429606627..7c70959ce 100644 --- a/internal/db/branch/switch_/switch__test.go +++ b/internal/db/branch/switch_/switch__test.go @@ -2,7 +2,6 @@ package switch_ import ( "context" - "fmt" "net/http" "os" "path/filepath" @@ -14,6 +13,7 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/supabase/cli/internal/db/reset" "github.com/supabase/cli/internal/testing/apitest" "github.com/supabase/cli/internal/utils" "github.com/supabase/cli/pkg/pgtest" @@ -42,10 +42,14 @@ func TestSwitchCommand(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE " + branch + " RENAME TO postgres;"). @@ -218,8 +222,10 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Query(reset.TERMINATE_BACKENDS) // Run test err := switchDatabase(context.Background(), "main", "target", conn.Intercept) // Check error @@ -234,10 +240,14 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). ReplyError(pgerrcode.DuplicateDatabase, `database "main" already exists`) // Setup mock docker @@ -260,10 +270,14 @@ func TestSwitchDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). + Query(reset.TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(reset.COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("ALTER DATABASE postgres RENAME TO main;"). Reply("ALTER DATABASE"). Query("ALTER DATABASE target RENAME TO postgres;"). diff --git a/internal/db/diff/diff.go b/internal/db/diff/diff.go index 38a3bed58..6c5faa892 100644 --- a/internal/db/diff/diff.go +++ b/internal/db/diff/diff.go @@ -146,12 +146,12 @@ func CreateShadowDatabase(ctx context.Context, port uint16) (string, error) { func ConnectShadowDatabase(ctx context.Context, timeout time.Duration, options ...func(*pgx.ConnConfig)) (conn *pgx.Conn, err error) { // Retry until connected, cancelled, or timeout - policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), uint64(timeout.Seconds())) + policy := start.NewBackoffPolicy(ctx, timeout) config := pgconn.Config{Port: utils.Config.Db.ShadowPort} connect := func() (*pgx.Conn, error) { return utils.ConnectLocalPostgres(ctx, config, options...) } - return backoff.RetryWithData(connect, backoff.WithContext(policy, ctx)) + return backoff.RetryWithData(connect, policy) } func MigrateShadowDatabase(ctx context.Context, container string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error { diff --git a/internal/db/reset/reset.go b/internal/db/reset/reset.go index a9d2f4f4f..a475ca80d 100644 --- a/internal/db/reset/reset.go +++ b/internal/db/reset/reset.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" @@ -164,20 +165,40 @@ func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) err return sql.ExecBatch(ctx, conn) } +const ( + TERMINATE_BACKENDS = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')" + COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')" +) + func DisconnectClients(ctx context.Context, conn *pgx.Conn) error { - // Must be executed separately because running in transaction is unsupported - disconn := "ALTER DATABASE postgres ALLOW_CONNECTIONS false;" - if _, err := conn.Exec(ctx, disconn); err != nil { + // Must be executed separately because looping in transaction is unsupported + // https://dba.stackexchange.com/a/11895 + disconn := migration.MigrationFile{ + Statements: []string{ + "ALTER DATABASE postgres ALLOW_CONNECTIONS false", + "ALTER DATABASE _supabase ALLOW_CONNECTIONS false", + TERMINATE_BACKENDS, + }, + } + if err := disconn.ExecBatch(ctx, conn); err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName { return errors.Errorf("failed to disconnect clients: %w", err) } } - term := fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres") - if _, err := conn.Exec(ctx, term); err != nil { - return errors.Errorf("failed to terminate backend: %w", err) + // Wait for WAL senders to drop their replication slots + policy := start.NewBackoffPolicy(ctx, 10*time.Second) + waitForDrop := func() error { + var count int + if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil { + err = errors.Errorf("failed to count replication slots: %w", err) + return &backoff.PermanentError{Err: err} + } else if count > 0 { + return errors.Errorf("replication slots still active: %d", count) + } + return nil } - return nil + return backoff.Retry(waitForDrop, policy) } func RestartDatabase(ctx context.Context, w io.Writer) error { diff --git a/internal/db/reset/reset_test.go b/internal/db/reset/reset_test.go index c65aa0ef9..4e3558be3 100644 --- a/internal/db/reset/reset_test.go +++ b/internal/db/reset/reset_test.go @@ -3,7 +3,6 @@ package reset import ( "context" "errors" - "fmt" "io" "net/http" "path/filepath" @@ -202,10 +201,14 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query(TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). Reply("DROP DATABASE"). Query("CREATE DATABASE postgres WITH OWNER postgres"). @@ -228,14 +231,17 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidCatalogName, `database "postgres" does not exist`). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - ReplyError(pgerrcode.UndefinedTable, `relation "pg_stat_activity" does not exist`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidCatalogName, `database "_supabase" does not exist`). + Query(TERMINATE_BACKENDS). + Query(COUNT_REPLICATION_SLOTS). + ReplyError(pgerrcode.UndefinedTable, `relation "pg_replication_slots" does not exist`) // Run test err := recreateDatabase(context.Background(), conn.Intercept) // Check error - assert.ErrorContains(t, err, `ERROR: relation "pg_stat_activity" does not exist (SQLSTATE 42P01)`) + assert.ErrorContains(t, err, `ERROR: relation "pg_replication_slots" does not exist (SQLSTATE 42P01)`) }) t.Run("throws error on failure to disconnect", func(t *testing.T) { @@ -243,8 +249,10 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). - ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`) + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). + Query(TERMINATE_BACKENDS) // Run test err := recreateDatabase(context.Background(), conn.Intercept) // Check error @@ -256,10 +264,14 @@ func TestRecreateDatabase(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;"). + conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false"). + Reply("ALTER DATABASE"). + Query("ALTER DATABASE _supabase ALLOW_CONNECTIONS false"). Reply("ALTER DATABASE"). - Query(fmt.Sprintf(utils.TerminateDbSqlFmt, "postgres")). - Reply("DO"). + Query(TERMINATE_BACKENDS). + Reply("SELECT 1"). + Query(COUNT_REPLICATION_SLOTS). + Reply("SELECT 1", []interface{}{0}). Query("DROP DATABASE IF EXISTS postgres WITH (FORCE)"). ReplyError(pgerrcode.ObjectInUse, `database "postgres" is used by an active logical replication slot`). Query("CREATE DATABASE postgres WITH OWNER postgres"). diff --git a/internal/db/start/start.go b/internal/db/start/start.go index a300f5594..f0fc488d2 100644 --- a/internal/db/start/start.go +++ b/internal/db/start/start.go @@ -160,6 +160,14 @@ EOF`} return initCurrentBranch(fsys) } +func NewBackoffPolicy(ctx context.Context, timeout time.Duration) backoff.BackOff { + policy := backoff.WithMaxRetries( + backoff.NewConstantBackOff(time.Second), + uint64(timeout.Seconds()), + ) + return backoff.WithContext(policy, ctx) +} + func WaitForHealthyService(ctx context.Context, timeout time.Duration, started ...string) error { probe := func() error { var errHealth []error @@ -173,10 +181,7 @@ func WaitForHealthyService(ctx context.Context, timeout time.Duration, started . started = unhealthy return errors.Join(errHealth...) } - policy := backoff.WithContext(backoff.WithMaxRetries( - backoff.NewConstantBackOff(time.Second), - uint64(timeout.Seconds()), - ), ctx) + policy := NewBackoffPolicy(ctx, timeout) err := backoff.Retry(probe, policy) if err != nil && !errors.Is(err, context.Canceled) { // Print container logs for easier debugging diff --git a/internal/utils/misc.go b/internal/utils/misc.go index 0993ae806..202d80d0a 100644 --- a/internal/utils/misc.go +++ b/internal/utils/misc.go @@ -31,17 +31,7 @@ func ShortContainerImageName(imageName string) string { return matches[1] } -const ( - // https://dba.stackexchange.com/a/11895 - // Args: dbname - TerminateDbSqlFmt = ` -SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '%[1]s'; --- Wait for WAL sender to drop replication slot. -DO 'BEGIN WHILE ( - SELECT COUNT(*) FROM pg_replication_slots WHERE database = ''%[1]s'' -) > 0 LOOP END LOOP; END';` - SuggestDebugFlag = "Try rerunning the command with --debug to troubleshoot the error." -) +const SuggestDebugFlag = "Try rerunning the command with --debug to troubleshoot the error." var ( CmdSuggestion string