Skip to content

Commit

Permalink
fix: migration fails with error pq: VACUUM cannot run inside a transa…
Browse files Browse the repository at this point in the history
…ction block (#3464)
  • Loading branch information
atzoum committed Jun 8, 2023
1 parent 63a24eb commit 9e32802
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 201 deletions.
3 changes: 2 additions & 1 deletion jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
// backupDS writes both jobs and job_staus table to JOBS_BACKUP_STORAGE_PROVIDER
func (jd *HandleT) backupDS(ctx context.Context, backupDSRange *dataSetRangeT) error {
if err := jd.WithTx(func(tx *Tx) error {
return jd.cleanStatusTable(ctx, tx, backupDSRange.ds.JobStatusTable, true)
_, err := jd.cleanStatusTable(ctx, tx, backupDSRange.ds.JobStatusTable, false)
return err
}); err != nil {
return fmt.Errorf("error while cleaning status table: %w", err)
}
Expand Down
65 changes: 37 additions & 28 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,51 +252,62 @@ func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT)

// based on an estimate cleans up the status tables
func (jd *HandleT) cleanupStatusTables(ctx context.Context, dsList []dataSetT) error {
toCompact, err := jd.getCleanUpCandidates(ctx, dsList)
if err != nil {
return err
}

var toVacuum []string
toVacuumFull, err := jd.getVacuumFullCandidates(ctx, dsList)
if err != nil {
return err
}
toVacuumFullMap := lo.Associate(toVacuumFull, func(k string) (string, struct{}) {
return k, struct{}{}
})
toCompact, err := jd.getCleanUpCandidates(ctx, dsList)
if err != nil {
return err
}
start := time.Now()
defer stats.Default.NewTaggedStat(
"jobsdb_compact_status_tables",
stats.TimerType,
stats.Tags{"customVal": jd.tablePrefix},
).Since(start)

return jd.WithTx(func(tx *Tx) error {
if err := jd.WithTx(func(tx *Tx) error {
for _, statusTable := range toCompact {
table := statusTable.JobStatusTable
// clean up and vacuum if not present in toVacuumFullMap
_, ok := toVacuumFullMap[table]
if err := jd.cleanStatusTable(
ctx,
tx,
table,
!ok,
); err != nil {
vacuum, err := jd.cleanStatusTable(ctx, tx, table, !ok)
if err != nil {
return err
}
}
// vacuum full if present in toVacuumFull
for _, table := range toVacuumFull {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
if vacuum {
toVacuum = append(toVacuum, table)
}
}

return nil
})
}); err != nil {
return err
}
// vacuum full
for _, table := range toVacuumFull {
jd.logger.Infof("vacuuming full %q", table)
if _, err := jd.dbHandle.ExecContext(ctx, fmt.Sprintf(`VACUUM FULL %[1]q`, table)); err != nil {
return err
}
}
// vacuum analyze
for _, table := range toVacuum {
jd.logger.Infof("vacuuming %q", table)
if _, err := jd.dbHandle.ExecContext(ctx, fmt.Sprintf(`VACUUM ANALYZE %[1]q`, table)); err != nil {
return err
}
}
return nil
}

// cleanStatusTable deletes all rows except for the latest status for each job
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string, canBeVacuumed bool) error {
func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string, canBeVacuumed bool) (vacuum bool, err error) {
result, err := tx.ExecContext(
ctx,
fmt.Sprintf(`DELETE FROM %[1]q
Expand All @@ -305,23 +316,21 @@ func (*HandleT) cleanStatusTable(ctx context.Context, tx *Tx, table string, canB
)`, table),
)
if err != nil {
return err
return false, err
}

numJobStatusDeleted, err := result.RowsAffected()
if err != nil {
return err
return false, err
}

query := fmt.Sprintf(`ANALYZE %q`, table)
if numJobStatusDeleted > vacuumAnalyzeStatusTableThreshold && canBeVacuumed {
query = fmt.Sprintf(`VACUUM ANALYZE %q`, table)
vacuum = true
} else {
_, err = tx.ExecContext(ctx, fmt.Sprintf(`ANALYZE %q`, table))
}
_, err = tx.ExecContext(
ctx,
query,
)
return err

return
}

// getMigrationList returns the list of datasets to migrate from,
Expand Down
Loading

0 comments on commit 9e32802

Please sign in to comment.