Skip to content

Commit

Permalink
chore(jobsdb): tuning and improvements (#3584)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Jul 6, 2023
1 parent a8abd1d commit cfa6132
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
9 changes: 4 additions & 5 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ var (
backupRowsBatchSize int64
backupMaxTotalPayloadSize int64
pkgLogger logger.Logger
jobStatusCountMigrationCheck bool // TODO: Remove this in next release
)

// Loads db config and migration related config from config file
Expand All @@ -667,7 +666,7 @@ func loadConfig() {
refreshDSListLoopSleepDuration: How often is the loop (which refreshes DSList) run
maxTableSizeInMB: Maximum Table size in MB
*/
config.RegisterFloat64ConfigVariable(0.8, &jobDoneMigrateThres, true, "JobsDB.jobDoneMigrateThres")
config.RegisterFloat64ConfigVariable(0.7, &jobDoneMigrateThres, true, "JobsDB.jobDoneMigrateThres")
config.RegisterFloat64ConfigVariable(3, &jobStatusMigrateThres, true, "JobsDB.jobStatusMigrateThres")
config.RegisterFloat64ConfigVariable(0.2, &jobMinRowsMigrateThres, true, "JobsDB.jobMinRowsMigrateThres")
config.RegisterIntConfigVariable(100000, &maxDSSize, true, 1, "JobsDB.maxDSSize")
Expand All @@ -680,11 +679,10 @@ func loadConfig() {
config.RegisterInt64ConfigVariable(64*bytesize.MB, &backupMaxTotalPayloadSize, true, 1, "JobsDB.maxBackupTotalPayloadSize")
config.RegisterDurationConfigVariable(30, &migrateDSLoopSleepDuration, true, time.Second, []string{"JobsDB.migrateDSLoopSleepDuration", "JobsDB.migrateDSLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &addNewDSLoopSleepDuration, true, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(10, &refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...)
config.RegisterDurationConfigVariable(60, &cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
config.RegisterDurationConfigVariable(120, &cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
config.RegisterDurationConfigVariable(24, &jobCleanupFrequency, true, time.Hour, []string{"JobsDB.jobCleanupFrequency"}...)
config.RegisterBoolConfigVariable(false, &jobStatusCountMigrationCheck, true, "JobsDB.jobStatusCountMigrationCheck")
}

func Init2() {
Expand Down Expand Up @@ -2666,6 +2664,7 @@ func (jd *HandleT) refreshDSList(ctx context.Context) error {
if previousLastDS.Index == nextLastDS.Index {
return nil
}
defer stats.Default.NewTaggedStat("refresh_ds_loop_lock", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix}).RecordDuration()()
err = jd.dsListLock.WithLockInCtx(ctx, func(l lock.LockToken) error {
return jd.doRefreshDSRangeList(l)
})
Expand Down
18 changes: 6 additions & 12 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
}
var l lock.LockToken
var lockChan chan<- lock.LockToken

lockStart := time.Now()
err = jd.WithTx(func(tx *Tx) error {
return jd.withDistributedSharedLock(ctx, tx, "schema_migrate", func() error { // cannot run while schema migration is running
// Take the lock and run actual migration
Expand Down Expand Up @@ -165,6 +167,7 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
})
})
if l != nil {
defer stats.Default.NewTaggedStat("migration_loop_lock", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix}).Since(lockStart)
defer func() { lockChan <- l }()
if err == nil {
if err = jd.doRefreshDSRangeList(l); err != nil {
Expand Down Expand Up @@ -526,7 +529,7 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()

var delCount, totalCount, statusCount int
var delCount, totalCount int
sqlStatement := fmt.Sprintf(`SELECT COUNT(*) from %q`, ds.JobTable)
if err = jd.dbHandle.QueryRow(sqlStatement).Scan(&totalCount); err != nil {
return false, false, 0, fmt.Errorf("error getting count of jobs in %s: %w", ds.JobTable, err)
Expand All @@ -541,15 +544,6 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (
return false, false, 0, fmt.Errorf("error getting count of jobs in %s: %w", ds.JobStatusTable, err)
}

if jobStatusCountMigrationCheck {
// Total number of job status. If this table grows too big (e.g. a lot of retries)
// we migrate to a new table and get rid of old job status
sqlStatement = fmt.Sprintf(`SELECT COUNT(*) from %q`, ds.JobStatusTable)
if err = jd.dbHandle.QueryRow(sqlStatement).Scan(&statusCount); err != nil {
return false, false, 0, fmt.Errorf("error getting count of jobs in %s: %w", ds.JobStatusTable, err)
}
}

recordsLeft = totalCount - delCount

if jd.MinDSRetentionPeriod > 0 {
Expand Down Expand Up @@ -581,10 +575,10 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (

smallThreshold := jobMinRowsMigrateThres * float64(*jd.MaxDSSize)
isSmall := func() bool {
return float64(totalCount) < smallThreshold && float64(statusCount) < smallThreshold
return float64(totalCount) < smallThreshold
}

if float64(delCount)/float64(totalCount) > jobDoneMigrateThres || (float64(statusCount)/float64(totalCount) > jobStatusMigrateThres) {
if float64(delCount)/float64(totalCount) > jobDoneMigrateThres {
return true, isSmall(), recordsLeft, nil
}

Expand Down
2 changes: 1 addition & 1 deletion router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (rt *Handle) Setup(
config.RegisterDurationConfigVariable(300, &rt.reloadableConfig.maxRetryBackoff, true, time.Second, []string{"Router.maxRetryBackoff", "Router.maxRetryBackoffInS"}...)
config.RegisterStringConfigVariable("", &rt.reloadableConfig.toAbortDestinationIDs, true, "Router.toAbortDestinationIDs")
config.RegisterDurationConfigVariable(2, &rt.reloadableConfig.pickupFlushInterval, true, time.Second, "Router.pickupFlushInterval")
config.RegisterDurationConfigVariable(1000, &rt.reloadableConfig.failingJobsPenaltySleep, true, time.Millisecond, []string{"Router.failingJobsPenaltySleep"}...)
config.RegisterDurationConfigVariable(2000, &rt.reloadableConfig.failingJobsPenaltySleep, true, time.Millisecond, []string{"Router.failingJobsPenaltySleep"}...)
config.RegisterFloat64ConfigVariable(0.6, &rt.reloadableConfig.failingJobsPenaltyThreshold, true, []string{"Router.failingJobsPenaltyThreshold"}...)

config.RegisterDurationConfigVariable(60, &rt.diagnosisTickerTime, false, time.Second, []string{"Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS"}...)
Expand Down

0 comments on commit cfa6132

Please sign in to comment.