Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make set_unique operations respect the contract for old schema versions #118

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/15_set_column_unique.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"column": "review",
"unique": {
"name": "reviews_review_unique"
}
},
"up": "review || '-' || (random()*1000000)::integer",
"down": "review"
}
}
]
Expand Down
4 changes: 3 additions & 1 deletion pkg/migrations/op_alter_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (o *OpAlterColumn) Validate(ctx context.Context, s *schema.Schema) error {
// Apply any special validation rules for the inner operation
op := o.innerOperation()
switch op.(type) {
case *OpRenameColumn, *OpSetUnique:
case *OpRenameColumn:
if o.Up != "" {
return NoUpSQLAllowedError{}
}
Expand Down Expand Up @@ -126,6 +126,8 @@ func (o *OpAlterColumn) innerOperation() Operation {
Table: o.Table,
Column: o.Column,
Name: o.Unique.Name,
Up: o.Up,
Down: o.Down,
}
}
return nil
Expand Down
132 changes: 123 additions & 9 deletions pkg/migrations/op_set_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,136 @@ type OpSetUnique struct {
Name string `json:"name"`
Table string `json:"table"`
Column string `json:"column"`
Up string `json:"up"`
Down string `json:"down"`
}

var _ Operation = (*OpSetUnique)(nil)

func (o *OpSetUnique) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Column)))
return err
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// create a copy of the column on the underlying table.
if err := duplicateColumn(ctx, conn, table, *column); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
}

// Add a unique index to the new column
if err := o.addUniqueIndex(ctx, conn); err != nil {
return fmt.Errorf("failed to add unique index: %w", err)
}

// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
err := createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, o.Column),
Direction: TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column),
StateSchema: stateSchema,
SQL: o.Up,
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}

// Backfill the new column with values from the old column.
if err := backFill(ctx, conn, o.Table, TemporaryName(o.Column)); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
}

// Add the new column to the internal schema representation. This is done
// here, before creation of the down trigger, so that the trigger can declare
// a variable for the new column.
table.AddColumn(o.Column, schema.Column{
Name: TemporaryName(o.Column),
})

// Add a trigger to copy values from the new column to the old, rewriting values using the `down` SQL.
err = createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, TemporaryName(o.Column)),
Direction: TriggerDirectionDown,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
}

return nil
}

func (o *OpSetUnique) Complete(ctx context.Context, conn *sql.DB) error {
// create a unique constraint using the unique index
// Create a unique constraint using the unique index
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s ADD CONSTRAINT %s UNIQUE USING INDEX %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Name)))
if err != nil {
return err
}

// Remove the up function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)),
))
if err != nil {
return err
}

// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))),
))
if err != nil {
return err
}

// Drop the old column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Column)))
if err != nil {
return err
}

// Rename the new column to the old column name
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s RENAME COLUMN %s TO %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
pq.QuoteIdentifier(o.Column)))

return err
}

func (o *OpSetUnique) Rollback(ctx context.Context, conn *sql.DB) error {
// drop the index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX CONCURRENTLY IF EXISTS %s", o.Name))
// Drop the new column, taking the unique index on the column with it
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
))
if err != nil {
return err
}

// Remove the up function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)),
))
if err != nil {
return err
}

// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))),
))

return err
}
Expand All @@ -59,3 +163,13 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error {

return nil
}

func (o *OpSetUnique) addUniqueIndex(ctx context.Context, conn *sql.DB) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column))))

return err
}
37 changes: 28 additions & 9 deletions pkg/migrations/op_set_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,18 @@ func TestSetColumnUnique(t *testing.T) {
Unique: &migrations.UniqueConstraint{
Name: "reviews_review_unique",
},
Up: "review || '-' || (random()*1000000)::integer",
Down: "review",
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The unique index has been created on the underlying table.
IndexMustExist(t, db, "public", "reviews", "reviews_review_unique")

// Inserting values into the old schema that violate uniqueness should fail.
// Inserting values into the old schema that violate uniqueness should succeed.
MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
"username": "alice", "product": "apple", "review": "good",
})
MustNotInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
"username": "bob", "product": "banana", "review": "good",
})

Expand All @@ -77,12 +76,32 @@ func TestSetColumnUnique(t *testing.T) {
})
},
afterRollback: func(t *testing.T, db *sql.DB) {
// The unique index has been dropped from the the underlying table.
IndexMustNotExist(t, db, "public", "reviews", "reviews_review_unique")
// The new (temporary) `review` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review"))

// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review")))

// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review")))
},
afterComplete: func(t *testing.T, db *sql.DB) {
// The unique constraint has been created on the underlying table.
ConstraintMustExist(t, db, "public", "reviews", "reviews_review_unique")
// The new (temporary) `review` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review"))

// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review")))

// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review")))

// Inserting values into the new schema that violate uniqueness should fail.
MustInsert(t, db, "public", "02_set_unique", "reviews", map[string]string{
Expand Down