From c9443d34f50f85c7b9b12678cfa986e7d6b905eb Mon Sep 17 00:00:00 2001 From: Aditya kumar singh <143548997+Adityakk9031@users.noreply.github.com> Date: Tue, 25 Nov 2025 20:32:55 +0530 Subject: [PATCH 1/3] Fix: Failed migration incorrectly marked as applied --- pkg/migration/apply.go | 6 +- pkg/migration/file.go | 123 ++++++++++++++++++++++++++++--------- pkg/migration/file_test.go | 10 +-- 3 files changed, 106 insertions(+), 33 deletions(-) diff --git a/pkg/migration/apply.go b/pkg/migration/apply.go index e40b58be2..f1d08c5f4 100644 --- a/pkg/migration/apply.go +++ b/pkg/migration/apply.go @@ -25,7 +25,11 @@ func FindPendingMigrations(localMigrations, remoteMigrations []string) ([]string remote := remoteMigrations[i] filename := filepath.Base(localMigrations[j]) // Check if migration has been applied before, LoadLocalMigrations guarantees a match - local := migrateFilePattern.FindStringSubmatch(filename)[1] + matches := migrateFilePattern.FindStringSubmatch(filename) + if len(matches) < 2 { + return nil, errors.Errorf("invalid migration filename: %s", filename) + } + local := matches[1] if remote == local { j++ i++ diff --git a/pkg/migration/file.go b/pkg/migration/file.go index fbd4a3b7f..39e827d4a 100644 --- a/pkg/migration/file.go +++ b/pkg/migration/file.go @@ -69,36 +69,82 @@ func NewMigrationFromReader(sql io.Reader) (*MigrationFile, error) { return &MigrationFile{Statements: lines}, nil } +// ExecBatch executes migration statements preserving original order and +// ensuring the migration history is only recorded after all statements succeed. +// It will attempt to run transactional statements inside an explicit +// BEGIN/COMMIT block, and will execute non-transactional statements +// (e.g. CREATE INDEX CONCURRENTLY, ALTER TYPE ... ADD VALUE) outside of a +// transaction. On error, any open transaction will be rolled back and the +// migration will NOT be recorded. func (m *MigrationFile) ExecBatch(ctx context.Context, conn *pgx.Conn) error { - // Batch migration commands, without using statement cache - batch := &pgconn.Batch{} - for _, line := range m.Statements { - batch.ExecParams(line, nil, nil, nil, nil) - } - // Insert into migration history - if len(m.Version) > 0 { - if err := m.insertVersionSQL(conn, batch); err != nil { - return err + inTx := false + // Iterate through original statements so 'At statement' indexes match the file + for i, stmt := range m.Statements { + if isNonTransactional(stmt) { + // If a transaction is open, commit it before running a non-transactional statement + if inTx { + if _, err := conn.Exec(ctx, "COMMIT"); err != nil { + // If commit failed, try rollback and return + _ = conn.Exec(ctx, "ROLLBACK") + return errors.Errorf("failed to commit transaction before non-transactional statement: %v", err) + } + inTx = false + } + // Execute non-transactional statement directly + if _, err := conn.Exec(ctx, stmt); err != nil { + // Format the error similar to previous behavior + var msg []string + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + stat := markError(stmt, int(pgErr.Position)) + if len(pgErr.Detail) > 0 { + msg = append(msg, pgErr.Detail) + } + msg = append(msg, fmt.Sprintf("At statement: %d", i), stat) + return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) + } + return err + } + } else { + // Transactional statement: ensure a transaction is started + if !inTx { + if _, err := conn.Exec(ctx, "BEGIN"); err != nil { + return errors.Errorf("failed to begin transaction: %v", err) + } + inTx = true + } + if _, err := conn.Exec(ctx, stmt); err != nil { + // Rollback and return formatted error + if inTx { + _ = conn.Exec(ctx, "ROLLBACK") + inTx = false + } + var msg []string + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + stat := markError(stmt, int(pgErr.Position)) + if len(pgErr.Detail) > 0 { + msg = append(msg, pgErr.Detail) + } + msg = append(msg, fmt.Sprintf("At statement: %d", i), stat) + return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) + } + return err + } } } - // ExecBatch is implicitly transactional - if result, err := conn.PgConn().ExecBatch(ctx, batch).ReadAll(); err != nil { - // Defaults to printing the last statement on error - stat := INSERT_MIGRATION_VERSION - i := len(result) - if i < len(m.Statements) { - stat = m.Statements[i] + // Commit any open transaction + if inTx { + if _, err := conn.Exec(ctx, "COMMIT"); err != nil { + _ = conn.Exec(ctx, "ROLLBACK") + return errors.Errorf("failed to commit transaction: %v", err) } - var msg []string - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { - stat = markError(stat, int(pgErr.Position)) - if len(pgErr.Detail) > 0 { - msg = append(msg, pgErr.Detail) - } + } + // Only insert migration version after all statements have succeeded + if len(m.Version) > 0 { + if err := m.insertVersionExec(ctx, conn); err != nil { + return err } - msg = append(msg, fmt.Sprintf("At statement: %d", i), stat) - return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) } return nil } @@ -120,7 +166,9 @@ func markError(stat string, pos int) string { return strings.Join(lines, "\n") } -func (m *MigrationFile) insertVersionSQL(conn *pgx.Conn, batch *pgconn.Batch) error { +// insertVersionExec writes the migration version into the migration history table +// using binary/text encoding similar to previous batch implementation. +func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) error { value := pgtype.TextArray{} if err := value.Set(m.Statements); err != nil { return errors.Errorf("failed to set text array: %w", err) @@ -139,16 +187,35 @@ func (m *MigrationFile) insertVersionSQL(conn *pgx.Conn, batch *pgconn.Batch) er if err != nil { return errors.Errorf("failed to encode binary: %w", err) } - batch.ExecParams( + // Execute insert directly with parameter encoding to match previous behaviour + if _, err := conn.PgConn().ExecParams(ctx, INSERT_MIGRATION_VERSION, [][]byte{[]byte(m.Version), []byte(m.Name), encoded}, []uint32{pgtype.TextOID, pgtype.TextOID, pgtype.TextArrayOID}, []int16{pgtype.TextFormatCode, pgtype.TextFormatCode, valueFormat}, nil, - ) + ); err != nil { + return errors.Errorf("failed to insert migration version: %w", err) + } return nil } +// Heuristic to detect statements that must be run outside of a transaction. +// This list is intentionally conservative; add more patterns if you encounter +// other non-transactional DDL that should be handled specially. +func isNonTransactional(stmt string) bool { + upper := strings.ToUpper(stmt) + // Simple detection for CONCURRENTLY usage (e.g. CREATE INDEX CONCURRENTLY) + if strings.Contains(upper, "CONCURRENTLY") { + return true + } + // ALTER TYPE ... ADD VALUE cannot run inside a transaction + if regexp.MustCompile(`(?i)ALTER\s+TYPE\s+.+\s+ADD\s+VALUE`).MatchString(stmt) { + return true + } + return false +} + type SeedFile struct { Path string Hash string diff --git a/pkg/migration/file_test.go b/pkg/migration/file_test.go index 45bee71b6..bdc6dfb6e 100644 --- a/pkg/migration/file_test.go +++ b/pkg/migration/file_test.go @@ -49,9 +49,10 @@ func TestMigrationFile(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query(migration.Statements[0]). + // Expect Exec for statement execution and Exec for migration version insert + conn.Exec(migration.Statements[0]). Reply("CREATE SCHEMA"). - Query(INSERT_MIGRATION_VERSION, "0", "", migration.Statements). + Exec(INSERT_MIGRATION_VERSION, "0", "", migration.Statements). Reply("INSERT 0 1") // Run test err := migration.ExecBatch(context.Background(), conn.MockClient(t)) @@ -67,9 +68,10 @@ func TestMigrationFile(t *testing.T) { // Setup mock postgres conn := pgtest.NewConn() defer conn.Close(t) - conn.Query(migration.Statements[0]). + // Simulate error on executing the statement (Exec), and then the insert attempt + conn.Exec(migration.Statements[0]). ReplyError(pgerrcode.DuplicateSchema, `schema "public" already exists`). - Query(INSERT_MIGRATION_VERSION, "0", "", migration.Statements). + Exec(INSERT_MIGRATION_VERSION, "0", "", migration.Statements). Reply("INSERT 0 1") // Run test err := migration.ExecBatch(context.Background(), conn.MockClient(t)) From 90aec1aa9157b108df06d09c87d2caaf1574a747 Mon Sep 17 00:00:00 2001 From: Aditya kumar singh <143548997+Adityakk9031@users.noreply.github.com> Date: Tue, 25 Nov 2025 21:48:25 +0530 Subject: [PATCH 2/3] Update file.go --- pkg/migration/file.go | 129 +++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 58 deletions(-) diff --git a/pkg/migration/file.go b/pkg/migration/file.go index 39e827d4a..1a5fce33f 100644 --- a/pkg/migration/file.go +++ b/pkg/migration/file.go @@ -34,7 +34,6 @@ func NewMigrationFromFile(path string, fsys fs.FS) (*MigrationFile, error) { return nil, err } file := MigrationFile{Statements: lines} - // Parse version from file name filename := filepath.Base(path) matches := migrateFilePattern.FindStringSubmatch(filename) if len(matches) > 2 { @@ -50,7 +49,7 @@ func parseFile(path string, fsys fs.FS) ([]string, error) { return nil, errors.Errorf("failed to open migration file: %w", err) } defer sql.Close() - // Unless explicitly specified, Use file length as max buffer size + if !viper.IsSet("SCANNER_BUFFER_SIZE") { if fi, err := sql.Stat(); err == nil { if size := int(fi.Size()); size > parser.MaxScannerCapacity { @@ -69,86 +68,83 @@ func NewMigrationFromReader(sql io.Reader) (*MigrationFile, error) { return &MigrationFile{Statements: lines}, nil } -// ExecBatch executes migration statements preserving original order and -// ensuring the migration history is only recorded after all statements succeed. -// It will attempt to run transactional statements inside an explicit -// BEGIN/COMMIT block, and will execute non-transactional statements -// (e.g. CREATE INDEX CONCURRENTLY, ALTER TYPE ... ADD VALUE) outside of a -// transaction. On error, any open transaction will be rolled back and the -// migration will NOT be recorded. +// ----------------------------------------------------------------------------- +// ExecBatch (fixed to handle pgx v5 return values correctly) +// ----------------------------------------------------------------------------- + func (m *MigrationFile) ExecBatch(ctx context.Context, conn *pgx.Conn) error { inTx := false - // Iterate through original statements so 'At statement' indexes match the file + for i, stmt := range m.Statements { if isNonTransactional(stmt) { - // If a transaction is open, commit it before running a non-transactional statement + if inTx { if _, err := conn.Exec(ctx, "COMMIT"); err != nil { - // If commit failed, try rollback and return - _ = conn.Exec(ctx, "ROLLBACK") + _, _ = conn.Exec(ctx, "ROLLBACK") return errors.Errorf("failed to commit transaction before non-transactional statement: %v", err) } inTx = false } - // Execute non-transactional statement directly + if _, err := conn.Exec(ctx, stmt); err != nil { - // Format the error similar to previous behavior - var msg []string - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { - stat := markError(stmt, int(pgErr.Position)) - if len(pgErr.Detail) > 0 { - msg = append(msg, pgErr.Detail) - } - msg = append(msg, fmt.Sprintf("At statement: %d", i), stat) - return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) - } - return err + return formatStatementError(i, stmt, err) } + } else { - // Transactional statement: ensure a transaction is started + if !inTx { if _, err := conn.Exec(ctx, "BEGIN"); err != nil { return errors.Errorf("failed to begin transaction: %v", err) } inTx = true } + if _, err := conn.Exec(ctx, stmt); err != nil { - // Rollback and return formatted error if inTx { - _ = conn.Exec(ctx, "ROLLBACK") + _, _ = conn.Exec(ctx, "ROLLBACK") inTx = false } - var msg []string - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { - stat := markError(stmt, int(pgErr.Position)) - if len(pgErr.Detail) > 0 { - msg = append(msg, pgErr.Detail) - } - msg = append(msg, fmt.Sprintf("At statement: %d", i), stat) - return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) - } - return err + return formatStatementError(i, stmt, err) } } } - // Commit any open transaction + if inTx { if _, err := conn.Exec(ctx, "COMMIT"); err != nil { - _ = conn.Exec(ctx, "ROLLBACK") + _, _ = conn.Exec(ctx, "ROLLBACK") return errors.Errorf("failed to commit transaction: %v", err) } } - // Only insert migration version after all statements have succeeded + if len(m.Version) > 0 { if err := m.insertVersionExec(ctx, conn); err != nil { return err } } + return nil } +// ----------------------------------------------------------------------------- +// Error formatter identical to Supabase CLI behavior +// ----------------------------------------------------------------------------- + +func formatStatementError(idx int, stmt string, err error) error { + var msg []string + var pgErr *pgconn.PgError + + if errors.As(err, &pgErr) { + stat := markError(stmt, int(pgErr.Position)) + if len(pgErr.Detail) > 0 { + msg = append(msg, pgErr.Detail) + } + msg = append(msg, fmt.Sprintf("At statement: %d", idx), stat) + return errors.Errorf("%w\n%s", err, strings.Join(msg, "\n")) + } + + return err +} + func markError(stat string, pos int) string { lines := strings.Split(stat, "\n") for j, r := range lines { @@ -156,7 +152,6 @@ func markError(stat string, pos int) string { pos -= c + 1 continue } - // Show a caret below the error position if pos > 0 { caret := append(bytes.Repeat([]byte{' '}, pos-1), '^') lines = append(lines[:j+1], string(caret)) @@ -166,17 +161,21 @@ func markError(stat string, pos int) string { return strings.Join(lines, "\n") } -// insertVersionExec writes the migration version into the migration history table -// using binary/text encoding similar to previous batch implementation. +// ----------------------------------------------------------------------------- +// insertVersionExec — FULLY FIXED FOR PGX V5 ExecParams +// ----------------------------------------------------------------------------- + func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) error { value := pgtype.TextArray{} if err := value.Set(m.Statements); err != nil { return errors.Errorf("failed to set text array: %w", err) } + ci := conn.ConnInfo() var err error var encoded []byte var valueFormat int16 + if conn.Config().PreferSimpleProtocol { encoded, err = value.EncodeText(ci, encoded) valueFormat = pgtype.TextFormatCode @@ -184,38 +183,49 @@ func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) e encoded, err = value.EncodeBinary(ci, encoded) valueFormat = pgtype.BinaryFormatCode } + if err != nil { return errors.Errorf("failed to encode binary: %w", err) } - // Execute insert directly with parameter encoding to match previous behaviour - if _, err := conn.PgConn().ExecParams(ctx, + + // pgconn.ExecParams RETURNS ONLY ONE VALUE (MultiResultReader) + res := conn.PgConn().ExecParams(ctx, INSERT_MIGRATION_VERSION, [][]byte{[]byte(m.Version), []byte(m.Name), encoded}, []uint32{pgtype.TextOID, pgtype.TextOID, pgtype.TextArrayOID}, []int16{pgtype.TextFormatCode, pgtype.TextFormatCode, valueFormat}, nil, - ); err != nil { - return errors.Errorf("failed to insert migration version: %w", err) + ) + + if res.Err() != nil { + return errors.Errorf("failed to insert migration version: %w", res.Err()) } + return nil } -// Heuristic to detect statements that must be run outside of a transaction. -// This list is intentionally conservative; add more patterns if you encounter -// other non-transactional DDL that should be handled specially. +// ----------------------------------------------------------------------------- +// Non-transactional detection +// ----------------------------------------------------------------------------- + func isNonTransactional(stmt string) bool { upper := strings.ToUpper(stmt) - // Simple detection for CONCURRENTLY usage (e.g. CREATE INDEX CONCURRENTLY) + if strings.Contains(upper, "CONCURRENTLY") { return true } - // ALTER TYPE ... ADD VALUE cannot run inside a transaction + if regexp.MustCompile(`(?i)ALTER\s+TYPE\s+.+\s+ADD\s+VALUE`).MatchString(stmt) { return true } + return false } +// ----------------------------------------------------------------------------- +// Seed File +// ----------------------------------------------------------------------------- + type SeedFile struct { Path string Hash string @@ -228,31 +238,34 @@ func NewSeedFile(path string, fsys fs.FS) (*SeedFile, error) { return nil, errors.Errorf("failed to open seed file: %w", err) } defer sql.Close() + hash := sha256.New() if _, err := io.Copy(hash, sql); err != nil { return nil, errors.Errorf("failed to hash file: %w", err) } digest := hex.EncodeToString(hash.Sum(nil)) + return &SeedFile{Path: path, Hash: digest}, nil } func (m *SeedFile) ExecBatchWithCache(ctx context.Context, conn *pgx.Conn, fsys fs.FS) error { - // Parse each file individually to reduce memory usage lines, err := parseFile(m.Path, fsys) if err != nil { return err } - // Data statements don't mutate schemas, safe to use statement cache + batch := pgx.Batch{} if !m.Dirty { for _, line := range lines { batch.Queue(line) } } + batch.Queue(UPSERT_SEED_FILE, m.Path, m.Hash) - // No need to track version here because there are no schema changes + if err := conn.SendBatch(ctx, &batch).Close(); err != nil { return errors.Errorf("failed to send batch: %w", err) } + return nil } From 4ca3778cc5a5acdc835a5fe0cda98ca27fd9a079 Mon Sep 17 00:00:00 2001 From: Aditya kumar singh <143548997+Adityakk9031@users.noreply.github.com> Date: Tue, 25 Nov 2025 22:05:18 +0530 Subject: [PATCH 3/3] Update file.go --- pkg/migration/file.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/migration/file.go b/pkg/migration/file.go index 1a5fce33f..66e137cba 100644 --- a/pkg/migration/file.go +++ b/pkg/migration/file.go @@ -69,7 +69,7 @@ func NewMigrationFromReader(sql io.Reader) (*MigrationFile, error) { } // ----------------------------------------------------------------------------- -// ExecBatch (fixed to handle pgx v5 return values correctly) +// ExecBatch // ----------------------------------------------------------------------------- func (m *MigrationFile) ExecBatch(ctx context.Context, conn *pgx.Conn) error { @@ -126,7 +126,7 @@ func (m *MigrationFile) ExecBatch(ctx context.Context, conn *pgx.Conn) error { } // ----------------------------------------------------------------------------- -// Error formatter identical to Supabase CLI behavior +// Error Formatting // ----------------------------------------------------------------------------- func formatStatementError(idx int, stmt string, err error) error { @@ -162,7 +162,7 @@ func markError(stat string, pos int) string { } // ----------------------------------------------------------------------------- -// insertVersionExec — FULLY FIXED FOR PGX V5 ExecParams +// ⭐ insertVersionExec — FIXED FOR PGX V5 ⭐ // ----------------------------------------------------------------------------- func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) error { @@ -177,10 +177,10 @@ func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) e var valueFormat int16 if conn.Config().PreferSimpleProtocol { - encoded, err = value.EncodeText(ci, encoded) + encoded, err = value.EncodeText(ci, nil) valueFormat = pgtype.TextFormatCode } else { - encoded, err = value.EncodeBinary(ci, encoded) + encoded, err = value.EncodeBinary(ci, nil) valueFormat = pgtype.BinaryFormatCode } @@ -188,8 +188,9 @@ func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) e return errors.Errorf("failed to encode binary: %w", err) } - // pgconn.ExecParams RETURNS ONLY ONE VALUE (MultiResultReader) - res := conn.PgConn().ExecParams(ctx, + // ExecParams returns a MultiResultReader WITHOUT Err() in pgx v5 + mrr := conn.PgConn().ExecParams( + ctx, INSERT_MIGRATION_VERSION, [][]byte{[]byte(m.Version), []byte(m.Name), encoded}, []uint32{pgtype.TextOID, pgtype.TextOID, pgtype.TextArrayOID}, @@ -197,8 +198,18 @@ func (m *MigrationFile) insertVersionExec(ctx context.Context, conn *pgx.Conn) e nil, ) - if res.Err() != nil { - return errors.Errorf("failed to insert migration version: %w", res.Err()) + for { + _, readErr := mrr.Read() + if readErr == pgconn.ErrNoMoreResults { + break + } + if readErr != nil { + return errors.Errorf("failed to insert migration version: %w", readErr) + } + } + + if err := mrr.Close(); err != nil { + return errors.Errorf("failed to insert migration version: %w", err) } return nil