Skip to content

Commit

Permalink
chore: drop failing backups after few retries (#3695)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Aug 3, 2023
1 parent 3cca234 commit c81c66f
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
}
loop := func() error {
jd.logger.Debugf("backupDSLoop backup enabled %s", jd.tablePrefix)
backupDSRange, err := jd.getBackupDSRange()
backupDSRange, err := jd.getBackupDSRange(ctx)
if err != nil {
return fmt.Errorf("[JobsDB] :: Failed to get backup dataset range. Err: %w", err)
}
Expand Down Expand Up @@ -145,13 +145,18 @@ func (jd *HandleT) uploadDumps(ctx context.Context, dumps map[string]string) err
for workspaceID, filePath := range dumps {
wrkId := workspaceID
path := filePath
g.Go(misc.WithBugsnag(func() error {
operation := func() error {
if err := jd.uploadTableDump(ctx, wrkId, path); err != nil {
jd.logger.Errorf("[JobsDB] :: Failed to upload workspaceId %v. Error: %s", wrkId, err.Error())
stats.Default.NewTaggedStat("backup_ds_failed", stats.CountType, stats.Tags{"customVal": jd.tablePrefix, "workspaceId": wrkId}).Increment()
return err
}
return nil
}
g.Go(misc.WithBugsnag(func() error {
return backoff.RetryNotify(operation, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(config.GetInt("JobsDB.backup.maxRetries", 100))), func(err error, d time.Duration) {
jd.logger.Errorf("[JobsDB] :: Retrying upload workspaceId %v. Error: %s", wrkId, err.Error())
})
}))
}
return g.Wait()
Expand All @@ -160,15 +165,15 @@ func (jd *HandleT) uploadDumps(ctx context.Context, dumps map[string]string) err
func (jd *HandleT) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetRangeT) error {
tableName := backupDSRange.ds.JobStatusTable

getRowCount := func() (totalCount int64, err error) {
getRowCount := func(ctx context.Context) (totalCount int64, err error) {
countStmt := fmt.Sprintf(`SELECT COUNT(*) from %q where job_state in ('%s', '%s')`, tableName, Failed.State, Aborted.State)
if err = jd.dbHandle.QueryRow(countStmt).Scan(&totalCount); err != nil {
if err = jd.dbHandle.QueryRowContext(ctx, countStmt).Scan(&totalCount); err != nil {
return 0, fmt.Errorf("error while getting row count: %w", err)
}
return totalCount, nil
}

totalCount, err := getRowCount()
totalCount, err := getRowCount(ctx)
if err != nil {
return err
}
Expand All @@ -190,7 +195,7 @@ func (jd *HandleT) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetR
return fmt.Sprintf(`%v%v_%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, Aborted.State, workspaceID), nil
}

dumps, err := jd.createTableDumps(getFailedOnlyBackupQueryFn(backupDSRange), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getFailedOnlyBackupQueryFn(backupDSRange), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand All @@ -201,7 +206,8 @@ func (jd *HandleT) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetR
}()
err = jd.uploadDumps(ctx, dumps)
if err != nil {
return fmt.Errorf("error while uploading dumps for table: %s: %w", tableName, err)
jd.logger.Errorf("[JobsDB] :: Failed to upload dumps for table: %s. Error: %v", tableName, err)
return nil
}
stats.Default.NewTaggedStat("total_TableDump_TimeStat", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix}).Since(start)
return nil
Expand All @@ -210,15 +216,15 @@ func (jd *HandleT) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetR
func (jd *HandleT) backupJobsTable(ctx context.Context, backupDSRange *dataSetRangeT) error {
tableName := backupDSRange.ds.JobTable

getRowCount := func() (totalCount int64, err error) {
getRowCount := func(ctx context.Context) (totalCount int64, err error) {
countStmt := fmt.Sprintf(`SELECT COUNT(*) from %q`, tableName)
if err = jd.dbHandle.QueryRow(countStmt).Scan(&totalCount); err != nil {
if err = jd.dbHandle.QueryRowContext(ctx, countStmt).Scan(&totalCount); err != nil {
return 0, fmt.Errorf("error while getting row count: %w", err)
}
return totalCount, nil
}

totalCount, err := getRowCount()
totalCount, err := getRowCount(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -249,7 +255,7 @@ func (jd *HandleT) backupJobsTable(ctx context.Context, backupDSRange *dataSetRa
), nil
}

dumps, err := jd.createTableDumps(getJobsBackupQueryFn(backupDSRange), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getJobsBackupQueryFn(backupDSRange), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand All @@ -260,7 +266,8 @@ func (jd *HandleT) backupJobsTable(ctx context.Context, backupDSRange *dataSetRa
}()
err = jd.uploadDumps(ctx, dumps)
if err != nil {
return fmt.Errorf("error while uploading dumps for table: %s: %w", tableName, err)
jd.logger.Errorf("[JobsDB] :: Failed to upload dumps for table: %s. Error: %v", tableName, err)
return nil
}

// Do not record stat in error case as error case time might be low and skew stats
Expand All @@ -271,15 +278,15 @@ func (jd *HandleT) backupJobsTable(ctx context.Context, backupDSRange *dataSetRa
func (jd *HandleT) backupStatusTable(ctx context.Context, backupDSRange *dataSetRangeT) error {
tableName := backupDSRange.ds.JobStatusTable

getRowCount := func() (totalCount int64, err error) {
getRowCount := func(ctx context.Context) (totalCount int64, err error) {
countStmt := fmt.Sprintf(`SELECT COUNT(*) from %q`, tableName)
if err = jd.dbHandle.QueryRow(countStmt).Scan(&totalCount); err != nil {
if err = jd.dbHandle.QueryRowContext(ctx, countStmt).Scan(&totalCount); err != nil {
return 0, fmt.Errorf("error while getting row count: %w", err)
}
return totalCount, nil
}

totalCount, err := getRowCount()
totalCount, err := getRowCount(ctx)
if err != nil {
return err
}
Expand All @@ -302,7 +309,7 @@ func (jd *HandleT) backupStatusTable(ctx context.Context, backupDSRange *dataSet
return fmt.Sprintf(`%v%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, workspaceID), nil
}

dumps, err := jd.createTableDumps(getStatusBackupQueryFn(backupDSRange), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getStatusBackupQueryFn(backupDSRange), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand All @@ -313,7 +320,8 @@ func (jd *HandleT) backupStatusTable(ctx context.Context, backupDSRange *dataSet
}()
err = jd.uploadDumps(ctx, dumps)
if err != nil {
return fmt.Errorf("error while uploading dumps for table: %s: %w", tableName, err)
jd.logger.Errorf("[JobsDB] :: Failed to upload dumps for table: %s. Error: %v", tableName, err)
return nil
}

// Do not record stat in error case as error case time might be low and skew stats
Expand Down Expand Up @@ -529,7 +537,7 @@ func getStatusBackupQueryFn(backupDSRange *dataSetRangeT) func(int64) string {
}
}

func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func(string) (string, error), totalCount int64) (map[string]string, error) {
func (jd *HandleT) createTableDumps(ctx context.Context, queryFunc func(int64) string, pathFunc func(string) (string, error), totalCount int64) (map[string]string, error) {
defer jd.getTimerStat(
"table_FileDump_TimeStat",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
Expand All @@ -542,7 +550,7 @@ func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func(
stmt := queryFunc(offset)
var rawJSONRows json.RawMessage
var workspaceID string
rows, err := jd.dbHandle.Query(stmt)
rows, err := jd.dbHandle.QueryContext(ctx, stmt)
if err != nil {
return fmt.Errorf("error while getting rows: %w", err)
}
Expand Down Expand Up @@ -655,7 +663,7 @@ func (jd *HandleT) backupUploadWithExponentialBackoff(ctx context.Context, file
return output, err
}

func (jd *HandleT) getBackupDSRange() (*dataSetRangeT, error) {
func (jd *HandleT) getBackupDSRange(ctx context.Context) (*dataSetRangeT, error) {
var backupDS dataSetT
var backupDSRange dataSetRangeT

Expand Down Expand Up @@ -688,15 +696,15 @@ func (jd *HandleT) getBackupDSRange() (*dataSetRangeT, error) {

var minID, maxID sql.NullInt64
jobIDSQLStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) from %q`, backupDS.JobTable)
row := jd.dbHandle.QueryRow(jobIDSQLStatement)
row := jd.dbHandle.QueryRowContext(ctx, jobIDSQLStatement)
err = row.Scan(&minID, &maxID)
if err != nil {
return nil, fmt.Errorf("getting min and max job_id: %w", err)
}

var minCreatedAt, maxCreatedAt time.Time
jobTimeSQLStatement := fmt.Sprintf(`SELECT MIN(created_at), MAX(created_at) from %q`, backupDS.JobTable)
row = jd.dbHandle.QueryRow(jobTimeSQLStatement)
row = jd.dbHandle.QueryRowContext(ctx, jobTimeSQLStatement)
err = row.Scan(&minCreatedAt, &maxCreatedAt)
if err != nil {
return nil, fmt.Errorf("getting min and max created_at: %w", err)
Expand Down

0 comments on commit c81c66f

Please sign in to comment.