Skip to content

Commit

Permalink
feat: use binary protocol when connecting to remote (#2021)
Browse files Browse the repository at this point in the history
* fix: use binary protocol for connecting to remote

* chore: update unit tests

* fix: disable prepared statement when using transaction mode
  • Loading branch information
sweatybridge authored Mar 7, 2024
1 parent 4a64560 commit eb08493
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 133 deletions.
39 changes: 32 additions & 7 deletions internal/db/diff/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package diff
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -36,6 +34,33 @@ var dbConfig = pgconn.Config{
Database: "postgres",
}

var escapedSchemas = []string{
"auth",
"pgbouncer",
"realtime",
`\_realtime`,
"storage",
`\_analytics`,
`supabase\_functions`,
`supabase\_migrations`,
`information\_schema`,
`pg\_%`,
"cron",
"graphql",
`graphql\_public`,
"net",
"pgsodium",
`pgsodium\_masks`,
"pgtle",
"repack",
"tiger",
`tiger\_data`,
`timescaledb\_%`,
`\_timescaledb\_%`,
"topology",
"vault",
}

func TestRun(t *testing.T) {
t.Run("runs migra diff", func(t *testing.T) {
// Setup in-memory fs
Expand Down Expand Up @@ -102,7 +127,7 @@ func TestRun(t *testing.T) {
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(strings.ReplaceAll(reset.LIST_SCHEMAS, "$1", `('{auth,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","supabase\\_migrations","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
conn.Query(reset.LIST_SCHEMAS, escapedSchemas).
ReplyError(pgerrcode.DuplicateTable, `relation "test" already exists`)
// Run test
err := Run(context.Background(), []string{}, "", dbConfig, DiffSchemaMigra, fsys, conn.Intercept)
Expand Down Expand Up @@ -152,7 +177,7 @@ func TestMigrateShadow(t *testing.T) {
pgtest.MockMigrationHistory(conn)
conn.Query(sql).
Reply("CREATE SCHEMA").
Query(history.INSERT_MIGRATION_VERSION, "0", "test", fmt.Sprintf("{%s}", sql)).
Query(history.INSERT_MIGRATION_VERSION, "0", "test", []string{sql}).
Reply("INSERT 0 1")
// Run test
err := MigrateShadowDatabase(context.Background(), "test-shadow-db", fsys, conn.Intercept)
Expand Down Expand Up @@ -317,7 +342,7 @@ At statement 0: create schema public`)
pgtest.MockMigrationHistory(conn)
conn.Query(sql).
Reply("CREATE SCHEMA").
Query(history.INSERT_MIGRATION_VERSION, "0", "test", fmt.Sprintf("{%s}", sql)).
Query(history.INSERT_MIGRATION_VERSION, "0", "test", []string{sql}).
Reply("INSERT 0 1")
// Run test
diff, err := DiffDatabase(context.Background(), []string{"public"}, dbConfig, io.Discard, fsys, DiffSchemaMigra, conn.Intercept)
Expand All @@ -332,11 +357,11 @@ func TestUserSchema(t *testing.T) {
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(strings.ReplaceAll(reset.LIST_SCHEMAS, "$1", `('{auth,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","supabase\\_migrations","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
conn.Query(reset.LIST_SCHEMAS, escapedSchemas).
Reply("SELECT 1", []interface{}{"test"})
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand Down
8 changes: 4 additions & 4 deletions internal/db/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestLintDatabase(t *testing.T) {
Query("rollback").Reply("ROLLBACK")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestLintDatabase(t *testing.T) {
Query("rollback").Reply("ROLLBACK")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -189,7 +189,7 @@ func TestLintDatabase(t *testing.T) {
Query("rollback").Reply("ROLLBACK")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -209,7 +209,7 @@ func TestLintDatabase(t *testing.T) {
Query("rollback").Reply("ROLLBACK")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectLocalPostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand Down
44 changes: 35 additions & 9 deletions internal/db/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"testing"

"github.com/jackc/pgconn"
Expand All @@ -31,6 +30,33 @@ var dbConfig = pgconn.Config{
Database: "postgres",
}

var escapedSchemas = []string{
"auth",
"pgbouncer",
"realtime",
`\_realtime`,
"storage",
`\_analytics`,
`supabase\_functions`,
`supabase\_migrations`,
`information\_schema`,
`pg\_%`,
"cron",
"graphql",
`graphql\_public`,
"net",
"pgsodium",
`pgsodium\_masks`,
"pgtle",
"repack",
"tiger",
`tiger\_data`,
`timescaledb\_%`,
`\_timescaledb\_%`,
"topology",
"vault",
}

func TestPullCommand(t *testing.T) {
t.Run("throws error on missing docker", func(t *testing.T) {
// Setup in-memory fs
Expand Down Expand Up @@ -144,7 +170,7 @@ func TestPullSchema(t *testing.T) {
Reply("SELECT 0")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -169,11 +195,11 @@ func TestPullSchema(t *testing.T) {
defer conn.Close(t)
conn.Query(list.LIST_MIGRATION_VERSION).
Reply("SELECT 1", []interface{}{"0"}).
Query(strings.ReplaceAll(reset.LIST_SCHEMAS, "$1", `('{auth,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","supabase\\_migrations","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
Query(reset.LIST_SCHEMAS, escapedSchemas).
ReplyError(pgerrcode.DuplicateTable, `relation "test" already exists`)
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand Down Expand Up @@ -202,7 +228,7 @@ func TestPullSchema(t *testing.T) {
Reply("SELECT 1", []interface{}{"0"})
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -226,7 +252,7 @@ func TestSyncRemote(t *testing.T) {
Reply("SELECT 0")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -248,7 +274,7 @@ func TestSyncRemote(t *testing.T) {
Reply("SELECT 0")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -270,7 +296,7 @@ func TestSyncRemote(t *testing.T) {
Reply("SELECT 1", []interface{}{"20220727064247"})
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand All @@ -290,7 +316,7 @@ func TestSyncRemote(t *testing.T) {
Reply("SELECT 0")
// Connect to mock
ctx := context.Background()
mock, err := utils.ConnectRemotePostgres(ctx, dbConfig, conn.Intercept)
mock, err := utils.ConnectByConfig(ctx, dbConfig, conn.Intercept)
require.NoError(t, err)
defer mock.Close(ctx)
// Run test
Expand Down
2 changes: 1 addition & 1 deletion internal/db/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestMigrationPush(t *testing.T) {
conn.Query(list.LIST_MIGRATION_VERSION).
Reply("SELECT 0")
pgtest.MockMigrationHistory(conn)
conn.Query(history.INSERT_MIGRATION_VERSION, "0", "test", "{}").
conn.Query(history.INSERT_MIGRATION_VERSION, "0", "test", nil).
ReplyError(pgerrcode.NotNullViolation, `null value in column "version" of relation "schema_migrations"`)
// Run test
err := Run(context.Background(), false, false, false, false, dbConfig, fsys, conn.Intercept)
Expand Down
24 changes: 13 additions & 11 deletions internal/db/remote/changes/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package changes

import (
"context"
"io"

"github.com/jackc/pgconn"
"github.com/spf13/afero"
Expand Down Expand Up @@ -33,23 +34,24 @@ func Run(ctx context.Context, schema []string, config pgconn.Config, fsys afero.

func run(p utils.Program, ctx context.Context, schema []string, config pgconn.Config, fsys afero.Fs) (err error) {
// 1. Assert `supabase/migrations` and `schema_migrations` are in sync.
{
p.Send(utils.StatusMsg("Connecting to remote database..."))
conn, err := utils.ConnectRemotePostgres(ctx, config)
w := utils.StatusWriter{Program: p}
if len(schema) == 0 {
schema, err = loadSchema(ctx, config, w)
if err != nil {
return err
}
defer conn.Close(context.Background())
if len(schema) == 0 {
schema, err = diff.LoadUserSchemas(ctx, conn)
if err != nil {
return err
}
}
}

w := utils.StatusWriter{Program: p}
// 2. Diff remote db (source) & shadow db (target) and print it.
output, err = diff.DiffDatabase(ctx, schema, config, w, fsys, diff.DiffSchemaMigra)
return err
}

func loadSchema(ctx context.Context, config pgconn.Config, w io.Writer) ([]string, error) {
conn, err := utils.ConnectByConfigStream(ctx, config, w)
if err != nil {
return nil, err
}
defer conn.Close(context.Background())
return diff.LoadUserSchemas(ctx, conn)
}
10 changes: 3 additions & 7 deletions internal/db/remote/commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"path/filepath"

"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/spf13/afero"
"github.com/supabase/cli/internal/db/diff"
Expand Down Expand Up @@ -41,8 +40,8 @@ func Run(ctx context.Context, schema []string, config pgconn.Config, fsys afero.

func run(p utils.Program, ctx context.Context, schema []string, config pgconn.Config, fsys afero.Fs) error {
// 1. Assert `supabase/migrations` and `schema_migrations` are in sync.
p.Send(utils.StatusMsg("Connecting to remote database..."))
conn, err := utils.ConnectRemotePostgres(ctx, config)
w := utils.StatusWriter{Program: p}
conn, err := utils.ConnectByConfigStream(ctx, config, w)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,10 +91,7 @@ func fetchRemote(p utils.Program, ctx context.Context, schema []string, timestam
func assertRemoteInSync(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
remoteMigrations, err := list.LoadRemoteMigrations(ctx, conn)
if err != nil {
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) || pgErr.Code != pgerrcode.UndefinedTable {
return err
}
return err
}
localMigrations, err := list.LoadLocalMigrations(fsys)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/db/reset/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func WaitForServiceReady(ctx context.Context, started []string) error {

func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
conn, err := utils.ConnectRemotePostgres(ctx, config, options...)
conn, err := utils.ConnectByConfigStream(ctx, config, io.Discard, options...)
if err != nil {
return err
}
Expand Down
35 changes: 31 additions & 4 deletions internal/db/reset/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -293,6 +292,34 @@ func TestRestartDatabase(t *testing.T) {
})
}

var escapedSchemas = []string{
"public",
"auth",
"extensions",
"pgbouncer",
"realtime",
`\_realtime`,
"storage",
`\_analytics`,
`supabase\_functions`,
`information\_schema`,
`pg\_%`,
"cron",
"graphql",
`graphql\_public`,
"net",
"pgsodium",
`pgsodium\_masks`,
"pgtle",
"repack",
"tiger",
`tiger\_data`,
`timescaledb\_%`,
`\_timescaledb\_%`,
"topology",
"vault",
}

func TestResetRemote(t *testing.T) {
dbConfig := pgconn.Config{
Host: "db.supabase.co",
Expand All @@ -308,7 +335,7 @@ func TestResetRemote(t *testing.T) {
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(strings.ReplaceAll(LIST_SCHEMAS, "$1", `('{public,auth,extensions,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
conn.Query(LIST_SCHEMAS, escapedSchemas).
Reply("SELECT 1", []interface{}{"private"}).
Query("DROP SCHEMA IF EXISTS private CASCADE").
Reply("DROP SCHEMA").
Expand All @@ -335,7 +362,7 @@ func TestResetRemote(t *testing.T) {
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(strings.ReplaceAll(LIST_SCHEMAS, "$1", `('{public,auth,extensions,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
conn.Query(LIST_SCHEMAS, escapedSchemas).
ReplyError(pgerrcode.InsufficientPrivilege, "permission denied for relation information_schema")
// Run test
err := resetRemote(context.Background(), "", dbConfig, fsys, conn.Intercept)
Expand All @@ -349,7 +376,7 @@ func TestResetRemote(t *testing.T) {
// Setup mock postgres
conn := pgtest.NewConn()
defer conn.Close(t)
conn.Query(strings.ReplaceAll(LIST_SCHEMAS, "$1", `('{public,auth,extensions,pgbouncer,realtime,"\\_realtime",storage,"\\_analytics","supabase\\_functions","information\\_schema","pg\\_%",cron,graphql,"graphql\\_public",net,pgsodium,"pgsodium\\_masks",pgtle,repack,tiger,"tiger\\_data","timescaledb\\_%","\\_timescaledb\\_%",topology,vault}')`)).
conn.Query(LIST_SCHEMAS, escapedSchemas).
Reply("SELECT 0").
Query(dropObjects).
ReplyError(pgerrcode.InsufficientPrivilege, "permission denied for relation supabase_migrations")
Expand Down
Loading

0 comments on commit eb08493

Please sign in to comment.