Skip to content

Commit

Permalink
Rollback on backfill failure (#317)
Browse files Browse the repository at this point in the history
Ensure that migrations that fail during backfill are rolled back.

Add testcases to ensure rollback occurs when an error occurs:
* during the DDL phase of migration start
* during the backfill phase of migration start

Fixes #316
  • Loading branch information
andrew-farries committed Mar 12, 2024
1 parent 547ac3e commit 631d642
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/roll/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ func (m *Roll) ensureView(ctx context.Context, version, name string, table schem
func (m *Roll) performBackfills(ctx context.Context, tables []*schema.Table) error {
for _, table := range tables {
if err := migrations.Backfill(ctx, m.pgConn, table); err != nil {
return fmt.Errorf("unable to backfill table %q: %w", table.Name, err)
errRollback := m.Rollback(ctx)

return errors.Join(
fmt.Errorf("unable to backfill table %q: %w", table.Name, err),
errRollback)
}
}

Expand Down
79 changes: 79 additions & 0 deletions pkg/roll/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,85 @@ func TestSchemaIsDroppedAfterMigrationRollback(t *testing.T) {
})
}

func TestRollbackOnMigrationStartFailure(t *testing.T) {
t.Parallel()

t.Run("when the DDL phase fails", func(t *testing.T) {
t.Parallel()

testutils.WithMigratorAndConnectionToContainer(t, func(mig *roll.Roll, db *sql.DB) {
ctx := context.Background()

// start a migration that will fail during the DDL phase
err := mig.Start(ctx, &migrations.Migration{
Name: "01_create_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "table1",
Columns: []migrations.Column{
{
Name: "id",
Type: "invalid",
},
},
},
},
})
assert.Error(t, err)

// ensure that there is no active migration
status, err := mig.Status(ctx, "public")
assert.NoError(t, err)
assert.Equal(t, state.NoneMigrationStatus, status.Status)
})
})

t.Run("when the backfill phase fails", func(t *testing.T) {
t.Parallel()

testutils.WithMigratorAndConnectionToContainer(t, func(mig *roll.Roll, db *sql.DB) {
ctx := context.Background()

// run an initial migration to create the table
err := mig.Start(ctx, &migrations.Migration{
Name: "01_create_table",
Operations: migrations.Operations{createTableOp("table1")},
})
assert.NoError(t, err)

// complete the migration
err = mig.Complete(ctx)
assert.NoError(t, err)

// insert some data into the table
_, err = db.ExecContext(ctx, "INSERT INTO table1 (id, name) VALUES (1, 'alice'), (2, 'bob')")
assert.NoError(t, err)

// Start a migration that will fail during the backfill phase
// Change the type of the `name` column but provide invalid up and down SQL
err = mig.Start(ctx, &migrations.Migration{
Name: "02_add_column",
Operations: migrations.Operations{
&migrations.OpAlterColumn{
Table: "table1",
Column: "name",
Type: ptr("text"),
Up: ptr("invalid"),
Down: ptr("invalid"),
},
},
})
assert.Error(t, err)

// Ensure that there is no active migration
status, err := mig.Status(ctx, "public")
assert.NoError(t, err)
assert.Equal(t, "01_create_table", status.Version)
assert.Equal(t, state.CompleteMigrationStatus, status.Status)
})
})
}

func TestSchemaOptionIsRespected(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 631d642

Please sign in to comment.