Skip to content

Commit

Permalink
Separate DDL operations from DML (backfills) on migration start (#289)
Browse files Browse the repository at this point in the history
Separate the DDL operations required during migration start from the DML
operations (backfills). Complete all DDL steps before starting DML.

Each `Start` operation returns the name of the table that requires
backfill, if any. After all operations have started, backfills are run
on each table that requires one.

Separating DDL and DML during migration start like this will allow for
setting Postgres options that should apply only for the DDL phase of
migration start (as in #290).
  • Loading branch information
andrew-farries committed Feb 29, 2024
1 parent 937f65c commit 0746ba6
Show file tree
Hide file tree
Showing 21 changed files with 87 additions and 111 deletions.
2 changes: 1 addition & 1 deletion pkg/migrations/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// 2. Get the first batch of rows from the table, ordered by the primary key.
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
func backfill(ctx context.Context, conn *sql.DB, table *schema.Table, cbs ...CallbackFn) error {
func Backfill(ctx context.Context, conn *sql.DB, table *schema.Table, cbs ...CallbackFn) error {
// get the backfill column
identityColumn := getIdentityColumn(table)
if identityColumn == nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type Operation interface {
// Start will apply the required changes to enable supporting the new schema
// version in the database (through a view)
// update the given views to expose the new schema version
Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error
// Returns the table that requires backfilling, if any.
Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error)

// Complete will update the database schema to match the current version
// after calling Start.
Expand Down
19 changes: 9 additions & 10 deletions pkg/migrations/op_add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,32 @@ import (

var _ Operation = (*OpAddColumn)(nil)

func (o *OpAddColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpAddColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

if err := addColumn(ctx, conn, *o, table); err != nil {
return fmt.Errorf("failed to start add column operation: %w", err)
return nil, fmt.Errorf("failed to start add column operation: %w", err)
}

if o.Column.Comment != nil {
if err := addCommentToColumn(ctx, conn, o.Table, TemporaryName(o.Column.Name), *o.Column.Comment); err != nil {
return fmt.Errorf("failed to add comment to column: %w", err)
return nil, fmt.Errorf("failed to add comment to column: %w", err)
}
}

if !o.Column.IsNullable() && o.Column.Default == nil {
if err := addNotNullConstraint(ctx, conn, o.Table, o.Column.Name, TemporaryName(o.Column.Name)); err != nil {
return fmt.Errorf("failed to add not null constraint: %w", err)
return nil, fmt.Errorf("failed to add not null constraint: %w", err)
}
}

if o.Column.Check != nil {
if err := o.addCheckConstraint(ctx, conn); err != nil {
return fmt.Errorf("failed to add check constraint: %w", err)
return nil, fmt.Errorf("failed to add check constraint: %w", err)
}
}

var tableToBackfill *schema.Table
if o.Up != nil {
err := createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, o.Column.Name),
Expand All @@ -52,18 +53,16 @@ func (o *OpAddColumn) Start(ctx context.Context, conn *sql.DB, stateSchema strin
SQL: *o.Up,
})
if err != nil {
return fmt.Errorf("failed to create trigger: %w", err)
}
if err := backfill(ctx, conn, table, cbs...); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
return nil, fmt.Errorf("failed to create trigger: %w", err)
}
tableToBackfill = table
}

table.AddColumn(o.Column.Name, schema.Column{
Name: TemporaryName(o.Column.Name),
})

return nil
return tableToBackfill, nil
}

func (o *OpAddColumn) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_alter_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

var _ Operation = (*OpAlterColumn)(nil)

func (o *OpAlterColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpAlterColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
op := o.innerOperation()

return op.Start(ctx, conn, stateSchema, s, cbs...)
Expand Down
15 changes: 5 additions & 10 deletions pkg/migrations/op_change_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type OpChangeType struct {

var _ Operation = (*OpChangeType)(nil)

func (o *OpChangeType) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpChangeType) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// Create a copy of the column on the underlying table.
d := NewColumnDuplicator(conn, table, column).WithType(o.Type)
if err := d.Duplicate(ctx); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
return nil, fmt.Errorf("failed to duplicate column: %w", err)
}

// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
Expand All @@ -43,12 +43,7 @@ func (o *OpChangeType) Start(ctx context.Context, conn *sql.DB, stateSchema stri
SQL: o.Up,
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}

// Backfill the new column with values from the old column.
if err := backfill(ctx, conn, table, cbs...); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
return nil, fmt.Errorf("failed to create up trigger: %w", err)
}

// Add the new column to the internal schema representation. This is done
Expand All @@ -70,10 +65,10 @@ func (o *OpChangeType) Start(ctx context.Context, conn *sql.DB, stateSchema stri
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
return nil, fmt.Errorf("failed to create down trigger: %w", err)
}

return nil
return table, nil
}

func (o *OpChangeType) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (

var _ Operation = (*OpCreateIndex)(nil)

func (o *OpCreateIndex) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpCreateIndex) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// create index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
strings.Join(quoteColumnNames(o.Columns), ", ")))
return err
return nil, err
}

func (o *OpCreateIndex) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
10 changes: 5 additions & 5 deletions pkg/migrations/op_create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ import (

var _ Operation = (*OpCreateTable)(nil)

func (o *OpCreateTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpCreateTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
tempName := TemporaryName(o.Name)
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (%s)",
pq.QuoteIdentifier(tempName),
columnsToSQL(o.Columns)))
if err != nil {
return err
return nil, err
}

// Add comments to any columns that have them
for _, col := range o.Columns {
if col.Comment != nil {
if err := addCommentToColumn(ctx, conn, tempName, col.Name, *col.Comment); err != nil {
return fmt.Errorf("failed to add comment to column: %w", err)
return nil, fmt.Errorf("failed to add comment to column: %w", err)
}
}
}

// Add comment to the table itself
if o.Comment != nil {
if err := addCommentToTable(ctx, conn, tempName, *o.Comment); err != nil {
return fmt.Errorf("failed to add comment to table: %w", err)
return nil, fmt.Errorf("failed to add comment to table: %w", err)
}
}

Expand All @@ -50,7 +50,7 @@ func (o *OpCreateTable) Start(ctx context.Context, conn *sql.DB, stateSchema str
Columns: columns,
})

return nil
return nil, nil
}

func (o *OpCreateTable) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/migrations/op_drop_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpDropColumn)(nil)

func (o *OpDropColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpDropColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
if o.Down != nil {
err := createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, o.Column),
Expand All @@ -26,12 +26,12 @@ func (o *OpDropColumn) Start(ctx context.Context, conn *sql.DB, stateSchema stri
SQL: *o.Down,
})
if err != nil {
return err
return nil, err
}
}

s.GetTable(o.Table).RemoveColumn(o.Column)
return nil
return nil, nil
}

func (o *OpDropColumn) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
15 changes: 5 additions & 10 deletions pkg/migrations/op_drop_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (

var _ Operation = (*OpDropConstraint)(nil)

func (o *OpDropConstraint) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpDropConstraint) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// Create a copy of the column on the underlying table.
d := NewColumnDuplicator(conn, table, column).WithoutConstraint(o.Name)
if err := d.Duplicate(ctx); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
return nil, fmt.Errorf("failed to duplicate column: %w", err)
}

// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
Expand All @@ -35,12 +35,7 @@ func (o *OpDropConstraint) Start(ctx context.Context, conn *sql.DB, stateSchema
SQL: o.upSQL(),
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}

// Backfill the new column with values from the old column.
if err := backfill(ctx, conn, table, cbs...); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
return nil, fmt.Errorf("failed to create up trigger: %w", err)
}

// Add the new column to the internal schema representation. This is done
Expand All @@ -62,9 +57,9 @@ func (o *OpDropConstraint) Start(ctx context.Context, conn *sql.DB, stateSchema
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
return nil, fmt.Errorf("failed to create down trigger: %w", err)
}
return nil
return table, nil
}

func (o *OpDropConstraint) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_drop_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

var _ Operation = (*OpDropIndex)(nil)

func (o *OpDropIndex) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpDropIndex) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// no-op
return nil
return nil, nil
}

func (o *OpDropIndex) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
15 changes: 5 additions & 10 deletions pkg/migrations/op_drop_not_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type OpDropNotNull struct {

var _ Operation = (*OpDropNotNull)(nil)

func (o *OpDropNotNull) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpDropNotNull) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// Create a copy of the column on the underlying table.
d := NewColumnDuplicator(conn, table, column).WithoutNotNull()
if err := d.Duplicate(ctx); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
return nil, fmt.Errorf("failed to duplicate column: %w", err)
}

// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
Expand All @@ -42,12 +42,7 @@ func (o *OpDropNotNull) Start(ctx context.Context, conn *sql.DB, stateSchema str
SQL: o.upSQL(),
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}

// Backfill the new column with values from the old column.
if err := backfill(ctx, conn, table, cbs...); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
return nil, fmt.Errorf("failed to create up trigger: %w", err)
}

// Add the new column to the internal schema representation. This is done
Expand All @@ -69,10 +64,10 @@ func (o *OpDropNotNull) Start(ctx context.Context, conn *sql.DB, stateSchema str
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
return nil, fmt.Errorf("failed to create down trigger: %w", err)
}

return nil
return table, nil
}

func (o *OpDropNotNull) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

var _ Operation = (*OpDropTable)(nil)

func (o *OpDropTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpDropTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
s.RemoveTable(o.Name)
return nil
return nil, nil
}

func (o *OpDropTable) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/migrations/op_raw_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

var _ Operation = (*OpRawSQL)(nil)

func (o *OpRawSQL) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpRawSQL) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
if !o.OnComplete {
_, err := conn.ExecContext(ctx, o.Up)
return err
return nil, err
}
return nil
return nil, nil
}

func (o *OpRawSQL) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_rename_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type OpRenameColumn struct {

var _ Operation = (*OpRenameColumn)(nil)

func (o *OpRenameColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
func (o *OpRenameColumn) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
table.RenameColumn(o.From, o.To)
return nil
return nil, nil
}

func (o *OpRenameColumn) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_rename_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

var _ Operation = (*OpRenameTable)(nil)

func (o *OpRenameTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
return s.RenameTable(o.From, o.To)
func (o *OpRenameTable) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
return nil, s.RenameTable(o.From, o.To)
}

func (o *OpRenameTable) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
Expand Down

0 comments on commit 0746ba6

Please sign in to comment.