Skip to content

Commit

Permalink
Relax backfill requirements so it works with unique columns (#288)
Browse files Browse the repository at this point in the history
UNIQUE NOT NULL columns should also work in order to perform backfills.
This change relaxes the check on backfill requirements to use those
columns if a primary key is not available.

Validation will still error out if no suitable column is found

Note: this also fixes the `unique` retrieval from schema, where we were
failing to understand composed unique indices, resulting in columns
flagged as unique where they weren't really unique.
  • Loading branch information
exekias committed Feb 27, 2024
1 parent 16377ca commit 937f65c
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 88 deletions.
61 changes: 44 additions & 17 deletions pkg/migrations/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import (
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
func backfill(ctx context.Context, conn *sql.DB, table *schema.Table, cbs ...CallbackFn) error {
// Get the primary key column for the table
pks := table.GetPrimaryKey()
if len(pks) != 1 {
return errors.New("table must have a single primary key column")
// get the backfill column
identityColumn := getIdentityColumn(table)
if identityColumn == nil {
return BackfillNotPossibleError{Table: table.Name}
}
pk := pks[0]

// Create a batcher for the table.
b := batcher{
table: table,
pkColumn: pk,
lastPK: nil,
batchSize: 1000,
table: table,
identityColumn: identityColumn,
lastValue: nil,
batchSize: 1000,
}

// Update each batch of rows, invoking callbacks for each one.
Expand All @@ -51,11 +50,39 @@ func backfill(ctx context.Context, conn *sql.DB, table *schema.Table, cbs ...Cal
return nil
}

// checkBackfill will return an error if the backfill operation is not supported.
func checkBackfill(table *schema.Table) error {
col := getIdentityColumn(table)
if col == nil {
return BackfillNotPossibleError{Table: table.Name}
}

return nil
}

// getIdentityColumn will return a column suitable for use in a backfill operation.
func getIdentityColumn(table *schema.Table) *schema.Column {
pks := table.GetPrimaryKey()
if len(pks) == 1 {
return pks[0]
}

// If there is no primary key, look for a unique not null column
for _, col := range table.Columns {
if col.Unique && !col.Nullable {
return &col
}
}

// no suitable column found
return nil
}

type batcher struct {
table *schema.Table
pkColumn *schema.Column
lastPK *string
batchSize int
table *schema.Table
identityColumn *schema.Column
lastValue *string
batchSize int
}

// updateBatch updates the next batch of rows in the table.
Expand All @@ -72,7 +99,7 @@ func (b *batcher) updateBatch(ctx context.Context, conn *sql.DB) error {

// Execute the query to update the next batch of rows and update the last PK
// value for the next batch
err = tx.QueryRowContext(ctx, query).Scan(&b.lastPK)
err = tx.QueryRowContext(ctx, query).Scan(&b.lastValue)
if err != nil {
return err
}
Expand All @@ -84,8 +111,8 @@ func (b *batcher) updateBatch(ctx context.Context, conn *sql.DB) error {
// buildQuery builds the query used to update the next batch of rows.
func (b *batcher) buildQuery() string {
whereClause := ""
if b.lastPK != nil {
whereClause = fmt.Sprintf("WHERE %s > %v", pq.QuoteIdentifier(b.pkColumn.Name), pq.QuoteLiteral(*b.lastPK))
if b.lastValue != nil {
whereClause = fmt.Sprintf("WHERE %s > %v", pq.QuoteIdentifier(b.identityColumn.Name), pq.QuoteLiteral(*b.lastValue))
}

return fmt.Sprintf(`
Expand All @@ -96,7 +123,7 @@ func (b *batcher) buildQuery() string {
)
SELECT LAST_VALUE(%[1]s) OVER() FROM update
`,
pq.QuoteIdentifier(b.pkColumn.Name),
pq.QuoteIdentifier(b.identityColumn.Name),
pq.QuoteIdentifier(b.table.Name),
b.batchSize,
whereClause)
Expand Down
9 changes: 4 additions & 5 deletions pkg/migrations/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,12 @@ func (e MultipleAlterColumnChangesError) Error() string {
return fmt.Sprintf("alter column operations require exactly one change, found %d", e.Changes)
}

type InvalidPrimaryKeyError struct {
Table string
Fields int
type BackfillNotPossibleError struct {
Table string
}

func (e InvalidPrimaryKeyError) Error() string {
return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields)
func (e BackfillNotPossibleError) Error() string {
return fmt.Sprintf("a backfill is required but table %q doesn't have a single column primary key or a UNIQUE, NOT NULL column", e.Table)
}

type InvalidReplicaIdentityError struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/migrations/op_add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ func (o *OpAddColumn) Validate(ctx context.Context, s *schema.Schema) error {
}
}

// Ensure backfill is possible
if o.Up != nil {
// needs backfill, ensure that the table has a primary key defined on exactly one column.
pk := table.GetPrimaryKey()
if len(pk) != 1 {
return InvalidPrimaryKeyError{Table: o.Table, Fields: len(pk)}
err := checkBackfill(table)
if err != nil {
return err
}
}

Expand Down

0 comments on commit 937f65c

Please sign in to comment.