Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var (
}
}
ctx, _ := signal.NotifyContext(cmd.Context(), os.Interrupt)
return reset.Run(ctx, dbConfig, fsys)
return reset.Run(ctx, version, dbConfig, fsys)
},
}

Expand Down Expand Up @@ -283,6 +283,7 @@ func init() {
// Build reset command
resetFlags := dbResetCmd.Flags()
resetFlags.BoolVar(&linked, "linked", false, "Resets the linked project to current migrations.")
resetFlags.StringVar(&version, "version", "", "Reset up to the specified version.")
dbCmd.AddCommand(dbResetCmd)
// Build lint command
lintFlags := dbLintCmd.Flags()
Expand Down
3 changes: 1 addition & 2 deletions internal/db/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/spf13/afero"
"github.com/supabase/cli/internal/db/reset"
"github.com/supabase/cli/internal/migration/apply"
"github.com/supabase/cli/internal/migration/up"
"github.com/supabase/cli/internal/utils"
Expand Down Expand Up @@ -51,7 +50,7 @@ func Run(ctx context.Context, dryRun, ignoreVersionMismatch bool, includeRoles,
}
// Seed database
if !dryRun && includeSeed {
if err := reset.SeedDatabase(ctx, conn, fsys); err != nil {
if err := apply.SeedDatabase(ctx, conn, fsys); err != nil {
return err
}
}
Expand Down
53 changes: 25 additions & 28 deletions internal/db/reset/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -38,10 +39,17 @@ var (
dropObjects string
)

func Run(ctx context.Context, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
func Run(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
if len(version) > 0 {
if _, err := strconv.Atoi(version); err != nil {
return repair.ErrInvalidVersion
}
if _, err := repair.GetMigrationFile(version, fsys); err != nil {
return err
}
}
if len(config.Password) > 0 {
fmt.Fprintln(os.Stderr, "Resetting remote database...")
return resetRemote(ctx, config, fsys, options...)
return resetRemote(ctx, version, config, fsys, options...)
}

// Sanity checks.
Expand All @@ -55,7 +63,7 @@ func Run(ctx context.Context, config pgconn.Config, fsys afero.Fs, options ...fu
}

// Reset postgres database because extensions (pg_cron, pg_net) require postgres
if err := resetDatabase(ctx, fsys, options...); err != nil {
if err := resetDatabase(ctx, version, fsys, options...); err != nil {
return err
}

Expand All @@ -64,8 +72,8 @@ func Run(ctx context.Context, config pgconn.Config, fsys afero.Fs, options ...fu
return nil
}

func resetDatabase(ctx context.Context, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting local database...")
func resetDatabase(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting local database"+toLogMessage(version))
if err := recreateDatabase(ctx, options...); err != nil {
return err
}
Expand All @@ -85,7 +93,14 @@ func resetDatabase(ctx context.Context, fsys afero.Fs, options ...func(*pgx.Conn
return err
}
defer conn.Close(context.Background())
return InitialiseDatabase(ctx, conn, fsys)
return apply.MigrateAndSeed(ctx, version, conn, fsys)
}

func toLogMessage(version string) string {
if len(version) > 0 {
return " to version: " + version
}
return "..."
}

func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
Expand All @@ -97,13 +112,6 @@ func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
return apply.BatchExecDDL(ctx, conn, strings.NewReader(utils.InitialSchemaSql))
}

func InitialiseDatabase(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
if err := apply.MigrateDatabase(ctx, conn, fsys); err != nil {
return err
}
return SeedDatabase(ctx, conn, fsys)
}

// Recreate postgres database by connecting to template1
func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin", Database: "template1"}, options...)
Expand All @@ -124,18 +132,6 @@ func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) err
return sql.ExecBatch(ctx, conn)
}

func SeedDatabase(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
seed, err := repair.NewMigrationFromFile(utils.SeedDataPath, fsys)
if errors.Is(err, os.ErrNotExist) {
return nil
} else if err != nil {
return err
}
fmt.Fprintln(os.Stderr, "Seeding data "+utils.Bold(utils.SeedDataPath)+"...")
// Batch seed commands, safe to use statement cache
return seed.ExecBatchWithCache(ctx, conn)
}

func DisconnectClients(ctx context.Context, conn *pgx.Conn) error {
// Must be executed separately because running in transaction is unsupported
disconn := "ALTER DATABASE postgres ALLOW_CONNECTIONS false;"
Expand Down Expand Up @@ -225,7 +221,8 @@ func WaitForServiceReady(ctx context.Context, started []string) error {
return nil
}

func resetRemote(ctx context.Context, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
conn, err := utils.ConnectRemotePostgres(ctx, config, options...)
if err != nil {
return err
Expand All @@ -248,7 +245,7 @@ func resetRemote(ctx context.Context, config pgconn.Config, fsys afero.Fs, optio
if err := migration.ExecBatch(ctx, conn); err != nil {
return err
}
return InitialiseDatabase(ctx, conn, fsys)
return apply.MigrateAndSeed(ctx, version, conn, fsys)
}

func ListSchemas(ctx context.Context, conn *pgx.Conn, exclude ...string) ([]string, error) {
Expand Down
74 changes: 8 additions & 66 deletions internal/db/reset/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/supabase/cli/internal/testing/apitest"
"github.com/supabase/cli/internal/testing/fstest"
"github.com/supabase/cli/internal/testing/pgtest"
"github.com/supabase/cli/internal/utils"
"gopkg.in/h2non/gock.v1"
Expand All @@ -29,13 +28,13 @@ func TestResetCommand(t *testing.T) {
// Setup in-memory fs
fsys := afero.NewMemMapFs()
// Run test
err := Run(context.Background(), pgconn.Config{Password: "postgres"}, fsys)
err := Run(context.Background(), "", pgconn.Config{Password: "postgres"}, fsys)
// Check error
assert.ErrorContains(t, err, "invalid port (outside range)")
})

t.Run("throws error on missing config", func(t *testing.T) {
err := Run(context.Background(), pgconn.Config{}, afero.NewMemMapFs())
err := Run(context.Background(), "", pgconn.Config{}, afero.NewMemMapFs())
assert.ErrorIs(t, err, os.ErrNotExist)
})

Expand All @@ -50,7 +49,7 @@ func TestResetCommand(t *testing.T) {
Get("/v" + utils.Docker.ClientVersion() + "/containers").
Reply(http.StatusServiceUnavailable)
// Run test
err := Run(context.Background(), pgconn.Config{}, fsys)
err := Run(context.Background(), "", pgconn.Config{}, fsys)
// Check error
assert.ErrorIs(t, err, utils.ErrNotRunning)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand All @@ -73,7 +72,7 @@ func TestResetCommand(t *testing.T) {
conn.Query("ALTER DATABASE postgres ALLOW_CONNECTIONS false;").
ReplyError(pgerrcode.InvalidParameterValue, `cannot disallow connections for current database`)
// Run test
err := Run(context.Background(), pgconn.Config{}, fsys, conn.Intercept)
err := Run(context.Background(), "", pgconn.Config{}, fsys, conn.Intercept)
// Check error
assert.ErrorContains(t, err, "ERROR: cannot disallow connections for current database (SQLSTATE 22023)")
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand Down Expand Up @@ -116,63 +115,6 @@ func TestInitDatabase(t *testing.T) {
})
}

func TestSeedDatabase(t *testing.T) {
t.Run("seeds from file", func(t *testing.T) {
// Setup in-memory fs
fsys := afero.NewMemMapFs()
// Setup seed file
sql := "INSERT INTO employees(name) VALUES ('Alice')"
require.NoError(t, afero.WriteFile(fsys, utils.SeedDataPath, []byte(sql), 0644))
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(sql).
Reply("INSERT 0 1")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{Port: 5432}, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
assert.NoError(t, SeedDatabase(ctx, mock, fsys))
})

t.Run("ignores missing seed", func(t *testing.T) {
assert.NoError(t, SeedDatabase(context.Background(), nil, afero.NewMemMapFs()))
})

t.Run("throws error on read failure", func(t *testing.T) {
// Setup in-memory fs
fsys := &fstest.OpenErrorFs{DenyPath: utils.SeedDataPath}
// Run test
err := SeedDatabase(context.Background(), nil, fsys)
// Check error
assert.ErrorIs(t, err, os.ErrPermission)
})

t.Run("throws error on insert failure", func(t *testing.T) {
// Setup in-memory fs
fsys := afero.NewMemMapFs()
// Setup seed file
sql := "INSERT INTO employees(name) VALUES ('Alice')"
require.NoError(t, afero.WriteFile(fsys, utils.SeedDataPath, []byte(sql), 0644))
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(sql).
ReplyError(pgerrcode.NotNullViolation, `null value in column "age" of relation "employees"`)
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{Port: 5432}, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
err = SeedDatabase(ctx, mock, fsys)
// Check error
assert.ErrorContains(t, err, `ERROR: null value in column "age" of relation "employees" (SQLSTATE 23502)`)
})
}

func TestRecreateDatabase(t *testing.T) {
t.Run("resets postgres database", func(t *testing.T) {
utils.Config.Db.Port = 54322
Expand Down Expand Up @@ -371,7 +313,7 @@ func TestResetRemote(t *testing.T) {
Query(dropObjects).
Reply("INSERT 0")
// Run test
err := resetRemote(context.Background(), dbConfig, fsys, conn.Intercept)
err := resetRemote(context.Background(), "", dbConfig, fsys, conn.Intercept)
// Check error
assert.NoError(t, err)
})
Expand All @@ -380,7 +322,7 @@ func TestResetRemote(t *testing.T) {
// Setup in-memory fs
fsys := afero.NewMemMapFs()
// Run test
err := resetRemote(context.Background(), pgconn.Config{}, fsys)
err := resetRemote(context.Background(), "", pgconn.Config{}, fsys)
// Check error
assert.ErrorContains(t, err, "invalid port (outside range)")
})
Expand All @@ -394,7 +336,7 @@ func TestResetRemote(t *testing.T) {
conn.Query(strings.ReplaceAll(LIST_SCHEMAS, "$1", "'{public,auth,extensions,pgbouncer,realtime,\"\\\\_realtime\",storage,\"\\\\_analytics\",\"supabase\\\\_functions\",\"supabase\\\\_migrations\",\"information\\\\_schema\",\"pg\\\\_%\",cron,graphql,\"graphql\\\\_public\",net,pgsodium,\"pgsodium\\\\_masks\",pgtle,repack,tiger,\"tiger\\\\_data\",\"timescaledb\\\\_%\",\"\\\\_timescaledb\\\\_%\",topology,vault}'")).
ReplyError(pgerrcode.InsufficientPrivilege, "permission denied for relation information_schema")
// Run test
err := resetRemote(context.Background(), dbConfig, fsys, conn.Intercept)
err := resetRemote(context.Background(), "", dbConfig, fsys, conn.Intercept)
// Check error
assert.ErrorContains(t, err, "ERROR: permission denied for relation information_schema (SQLSTATE 42501)")
})
Expand All @@ -411,7 +353,7 @@ func TestResetRemote(t *testing.T) {
ReplyError(pgerrcode.InsufficientPrivilege, "permission denied for relation supabase_migrations").
Query(dropObjects)
// Run test
err := resetRemote(context.Background(), dbConfig, fsys, conn.Intercept)
err := resetRemote(context.Background(), "", dbConfig, fsys, conn.Intercept)
// Check error
assert.ErrorContains(t, err, "ERROR: permission denied for relation supabase_migrations (SQLSTATE 42501)")
})
Expand Down
2 changes: 1 addition & 1 deletion internal/db/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func setupDatabase(ctx context.Context, fsys afero.Fs, w io.Writer, options ...f
if err := SetupDatabase(ctx, conn, utils.DbId, w, fsys); err != nil {
return err
}
return reset.InitialiseDatabase(ctx, conn, fsys)
return apply.MigrateAndSeed(ctx, "", conn, fsys)
}

func SetupDatabase(ctx context.Context, conn *pgx.Conn, host string, w io.Writer, fsys afero.Fs) error {
Expand Down
22 changes: 19 additions & 3 deletions internal/migration/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apply

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -14,12 +15,27 @@ import (
"github.com/supabase/cli/internal/utils"
)

func MigrateDatabase(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
migrations, err := list.LoadLocalMigrations(fsys)
func MigrateAndSeed(ctx context.Context, version string, conn *pgx.Conn, fsys afero.Fs) error {
migrations, err := list.LoadPartialMigrations(version, fsys)
if err != nil {
return err
}
return MigrateUp(ctx, conn, migrations, fsys)
if err := MigrateUp(ctx, conn, migrations, fsys); err != nil {
return err
}
return SeedDatabase(ctx, conn, fsys)
}

func SeedDatabase(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
seed, err := repair.NewMigrationFromFile(utils.SeedDataPath, fsys)
if errors.Is(err, os.ErrNotExist) {
return nil
} else if err != nil {
return err
}
fmt.Fprintln(os.Stderr, "Seeding data "+utils.Bold(utils.SeedDataPath)+"...")
// Batch seed commands, safe to use statement cache
return seed.ExecBatchWithCache(ctx, conn)
}

func MigrateUp(ctx context.Context, conn *pgx.Conn, pending []string, fsys afero.Fs) error {
Expand Down
Loading