diff --git a/jobsdb/backup.go b/jobsdb/backup.go index cfa297d81e..71fa3ca16b 100644 --- a/jobsdb/backup.go +++ b/jobsdb/backup.go @@ -22,11 +22,11 @@ import ( ) func (jd *Handle) isBackupEnabled() bool { - return jd.conf.backup.masterBackupEnabled && jd.conf.backup.instanceBackupEnabled + return jd.conf.backup.masterBackupEnabled.Load() && jd.conf.backup.instanceBackupEnabled.Load() } func (jd *Handle) IsMasterBackupEnabled() bool { - return jd.conf.backup.masterBackupEnabled + return jd.conf.backup.masterBackupEnabled.Load() } func (jd *Handle) backupDSLoop(ctx context.Context) { @@ -36,7 +36,7 @@ func (jd *Handle) backupDSLoop(ctx context.Context) { for { select { - case <-time.After(sleepMultiplier * jd.conf.backup.backupCheckSleepDuration): + case <-time.After(sleepMultiplier * jd.conf.backup.backupCheckSleepDuration.Load()): if !jd.isBackupEnabled() { jd.logger.Debugf("backupDSLoop backup disabled %s", jd.tablePrefix) continue @@ -200,7 +200,7 @@ func (jd *Handle) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetRa return fmt.Sprintf(`%v%v_%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, Aborted.State, workspaceID), nil } - dumps, err := jd.createTableDumps(ctx, getFailedOnlyBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize, jd.conf.backup.backupMaxTotalPayloadSize), getFileName, totalCount) + dumps, err := jd.createTableDumps(ctx, getFailedOnlyBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load(), jd.conf.backup.backupMaxTotalPayloadSize.Load()), getFileName, totalCount) if err != nil { return fmt.Errorf("error while creating table dump: %w", err) } @@ -268,7 +268,7 @@ func (jd *Handle) backupJobsTable(ctx context.Context, backupDSRange *dataSetRan ), nil } - dumps, err := jd.createTableDumps(ctx, getJobsBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize, jd.conf.backup.backupMaxTotalPayloadSize), getFileName, totalCount) + dumps, err := jd.createTableDumps(ctx, getJobsBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load(), jd.conf.backup.backupMaxTotalPayloadSize.Load()), getFileName, totalCount) if err != nil { return fmt.Errorf("error while creating table dump: %w", err) } @@ -326,7 +326,7 @@ func (jd *Handle) backupStatusTable(ctx context.Context, backupDSRange *dataSetR return fmt.Sprintf(`%v%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, workspaceID), nil } - dumps, err := jd.createTableDumps(ctx, getStatusBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize), getFileName, totalCount) + dumps, err := jd.createTableDumps(ctx, getStatusBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load()), getFileName, totalCount) if err != nil { return fmt.Errorf("error while creating table dump: %w", err) } @@ -680,7 +680,7 @@ func (jd *Handle) backupUploadWithExponentialBackoff(ctx context.Context, file * } bo := backoff.NewExponentialBackOff() bo.MaxInterval = time.Minute - bo.MaxElapsedTime = jd.conf.backup.maxBackupRetryTime + bo.MaxElapsedTime = jd.conf.backup.maxBackupRetryTime.Load() boRetries := backoff.WithMaxRetries(bo, uint64(jd.config.GetInt64("MAX_BACKOFF_RETRIES", 3))) boCtx := backoff.WithContext(boRetries, ctx) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 2edfaf0515..7c82fbce3d 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -483,14 +483,14 @@ type Handle struct { config *config.Config conf struct { - maxTableSize int64 - cacheExpiration time.Duration - addNewDSLoopSleepDuration time.Duration - refreshDSListLoopSleepDuration time.Duration - jobCleanupFrequency time.Duration - minDSRetentionPeriod time.Duration - maxDSRetentionPeriod time.Duration - refreshDSTimeout time.Duration + maxTableSize *config.Reloadable[int64] + cacheExpiration *config.Reloadable[time.Duration] + addNewDSLoopSleepDuration *config.Reloadable[time.Duration] + refreshDSListLoopSleepDuration *config.Reloadable[time.Duration] + jobCleanupFrequency *config.Reloadable[time.Duration] + minDSRetentionPeriod *config.Reloadable[time.Duration] + maxDSRetentionPeriod *config.Reloadable[time.Duration] + refreshDSTimeout *config.Reloadable[time.Duration] jobMaxAge func() time.Duration writeCapacity chan struct{} readCapacity chan struct{} @@ -502,28 +502,28 @@ type Handle struct { maxReaders int maxWriters int maxOpenConnections int - analyzeThreshold int - MaxDSSize *int + analyzeThreshold *config.Reloadable[int] + MaxDSSize *config.Reloadable[int] migration struct { - maxMigrateOnce, maxMigrateDSProbe int + maxMigrateOnce, maxMigrateDSProbe *config.Reloadable[int] vacuumFullStatusTableThreshold func() int64 vacuumAnalyzeStatusTableThreshold func() int64 jobDoneMigrateThres, jobStatusMigrateThres func() float64 jobMinRowsMigrateThres func() float64 - migrateDSLoopSleepDuration time.Duration - migrateDSTimeout time.Duration + migrateDSLoopSleepDuration *config.Reloadable[time.Duration] + migrateDSTimeout *config.Reloadable[time.Duration] } backup struct { - masterBackupEnabled bool - maxBackupRetryTime time.Duration - backupCheckSleepDuration time.Duration + masterBackupEnabled *config.Reloadable[bool] + maxBackupRetryTime *config.Reloadable[time.Duration] + backupCheckSleepDuration *config.Reloadable[time.Duration] preBackupHandlers []prebackup.Handler fileUploaderProvider fileuploader.Provider - instanceBackupEnabled bool + instanceBackupEnabled *config.Reloadable[bool] FailedOnly bool PathPrefix string - backupRowsBatchSize int64 - backupMaxTotalPayloadSize int64 + backupRowsBatchSize *config.Reloadable[int64] + backupMaxTotalPayloadSize *config.Reloadable[int64] } } } @@ -845,7 +845,7 @@ func (jd *Handle) workersAndAuxSetup() { jd.noResultsCache = cache.NewNoResultsCache[ParameterFilterT]( cacheParameterFilters, - func() time.Duration { return jd.conf.cacheExpiration }, + func() time.Duration { return jd.conf.cacheExpiration.Load() }, ) jd.logger.Infof("Connected to %s DB", jd.tablePrefix) @@ -855,40 +855,47 @@ func (jd *Handle) workersAndAuxSetup() { jd.statDropDSPeriod = stats.Default.NewTaggedStat("jobsdb.drop_ds_period", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix}) } -// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated func (jd *Handle) loadConfig() { // maxTableSizeInMB: Maximum Table size in MB - jd.config.RegisterInt64ConfigVariable(300, &jd.conf.maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB") - jd.config.RegisterDurationConfigVariable(120, &jd.conf.cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...) + jd.conf.maxTableSize = jd.config.GetReloadableInt64Var(1000000, 300, "JobsDB.maxTableSizeInMB") + jd.conf.cacheExpiration = jd.config.GetReloadableDurationVar(120, time.Minute, []string{"JobsDB.cacheExpiration"}...) // addNewDSLoopSleepDuration: How often is the loop (which checks for adding new DS) run - jd.config.RegisterDurationConfigVariable(5, &jd.conf.addNewDSLoopSleepDuration, true, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...) + jd.conf.addNewDSLoopSleepDuration = jd.config.GetReloadableDurationVar(5, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...) // refreshDSListLoopSleepDuration: How often is the loop (which refreshes DSList) run - jd.config.RegisterDurationConfigVariable(10, &jd.conf.refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...) - jd.config.RegisterDurationConfigVariable(24, &jd.conf.jobCleanupFrequency, true, time.Hour, []string{"JobsDB.jobCleanupFrequency"}...) + jd.conf.refreshDSListLoopSleepDuration = jd.config.GetReloadableDurationVar(10, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...) + jd.conf.jobCleanupFrequency = jd.config.GetReloadableDurationVar(24, time.Hour, []string{"JobsDB.jobCleanupFrequency"}...) enableWriterQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableWriterQueue", "JobsDB." + "enableWriterQueue"} - jd.config.RegisterBoolConfigVariable(true, &jd.conf.enableWriterQueue, true, enableWriterQueueKeys...) + jd.conf.enableWriterQueue = jd.config.GetBoolVar(true, enableWriterQueueKeys...) enableReaderQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableReaderQueue", "JobsDB." + "enableReaderQueue"} - jd.config.RegisterBoolConfigVariable(true, &jd.conf.enableReaderQueue, true, enableReaderQueueKeys...) + jd.conf.enableReaderQueue = jd.config.GetBoolVar(true, enableReaderQueueKeys...) maxWritersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxWriters", "JobsDB." + "maxWriters"} - jd.config.RegisterIntConfigVariable(3, &jd.conf.maxWriters, false, 1, maxWritersKeys...) + jd.conf.maxWriters = jd.config.GetIntVar(3, 1, maxWritersKeys...) maxReadersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxReaders", "JobsDB." + "maxReaders"} - jd.config.RegisterIntConfigVariable(6, &jd.conf.maxReaders, false, 1, maxReadersKeys...) + jd.conf.maxReaders = jd.config.GetIntVar(6, 1, maxReadersKeys...) maxOpenConnectionsKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxOpenConnections", "JobsDB." + "maxOpenConnections"} - jd.config.RegisterIntConfigVariable(20, &jd.conf.maxOpenConnections, false, 1, maxOpenConnectionsKeys...) + jd.conf.maxOpenConnections = jd.config.GetIntVar(20, 1, maxOpenConnectionsKeys...) analyzeThresholdKeys := []string{"JobsDB." + jd.tablePrefix + "." + "analyzeThreshold", "JobsDB." + "analyzeThreshold"} - jd.config.RegisterIntConfigVariable(30000, &jd.conf.analyzeThreshold, false, 1, analyzeThresholdKeys...) + jd.conf.analyzeThreshold = jd.config.GetReloadableIntVar(30000, 1, analyzeThresholdKeys...) minDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "minDSRetention", "JobsDB." + "minDSRetention"} - jd.config.RegisterDurationConfigVariable(0, &jd.conf.minDSRetentionPeriod, true, time.Minute, minDSRetentionPeriodKeys...) + jd.conf.minDSRetentionPeriod = jd.config.GetReloadableDurationVar(0, time.Minute, minDSRetentionPeriodKeys...) maxDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxDSRetention", "JobsDB." + "maxDSRetention"} - jd.config.RegisterDurationConfigVariable(90, &jd.conf.maxDSRetentionPeriod, true, time.Minute, maxDSRetentionPeriodKeys...) - jd.config.RegisterDurationConfigVariable(10, &jd.conf.refreshDSTimeout, true, time.Minute, "JobsDB.refreshDS.timeout") + jd.conf.maxDSRetentionPeriod = jd.config.GetReloadableDurationVar(90, time.Minute, maxDSRetentionPeriodKeys...) + jd.conf.refreshDSTimeout = jd.config.GetReloadableDurationVar(10, time.Minute, "JobsDB.refreshDS.timeout") // migrationConfig // migrateDSLoopSleepDuration: How often is the loop (which checks for migrating DS) run - jd.config.RegisterDurationConfigVariable(30, &jd.conf.migration.migrateDSLoopSleepDuration, true, time.Second, []string{"JobsDB.migrateDSLoopSleepDuration", "JobsDB.migrateDSLoopSleepDurationInS"}...) - jd.config.RegisterDurationConfigVariable(10, &jd.conf.migration.migrateDSTimeout, true, time.Minute, "JobsDB.migrateDS.timeout") + jd.conf.migration.migrateDSLoopSleepDuration = jd.config.GetReloadableDurationVar( + 30, time.Second, + []string{ + "JobsDB.migrateDSLoopSleepDuration", + "JobsDB.migrateDSLoopSleepDurationInS", + }..., + ) + jd.conf.migration.migrateDSTimeout = jd.config.GetReloadableDurationVar( + 10, time.Minute, "JobsDB.migrateDS.timeout", + ) // jobDoneMigrateThres: A DS is migrated when this fraction of the jobs have been processed jd.conf.migration.jobDoneMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobDoneMigrateThreshold", 0.7) } // jobStatusMigrateThres: A DS is migrated if the job_status exceeds this (* no_of_jobs) @@ -898,9 +905,13 @@ func (jd *Handle) loadConfig() { // then DSs that have less than 5% of maxDSSize are eligible for migration) jd.conf.migration.jobMinRowsMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobMinRowsMigrateThreshold", 0.2) } // maxMigrateOnce: Maximum number of DSs that are migrated together into one destination - jd.config.RegisterIntConfigVariable(10, &jd.conf.migration.maxMigrateOnce, true, 1, "JobsDB.maxMigrateOnce") + jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar( + 10, 1, "JobsDB.maxMigrateOnce", + ) // maxMigrateDSProbe: Maximum number of DSs that are checked from left to right if they are eligible for migration - jd.config.RegisterIntConfigVariable(10, &jd.conf.migration.maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe") + jd.conf.migration.maxMigrateDSProbe = jd.config.GetReloadableIntVar( + 10, 1, "JobsDB.maxMigrateDSProbe", + ) jd.conf.migration.vacuumFullStatusTableThreshold = func() int64 { return jd.config.GetInt64("JobsDB.vacuumFullStatusTableThreshold", 500*bytesize.MB) } @@ -911,46 +922,59 @@ func (jd *Handle) loadConfig() { // backupConfig // masterBackupEnabled = true => all the jobsdb are eligible for backup - jd.config.RegisterBoolConfigVariable(true, &jd.conf.backup.masterBackupEnabled, true, "JobsDB.backup.enabled") + jd.conf.backup.masterBackupEnabled = jd.config.GetReloadableBoolVar( + true, "JobsDB.backup.enabled", + ) // instanceBackupEnabled = true => the individual jobsdb too is eligible for backup - jd.config.RegisterBoolConfigVariable(false, &jd.conf.backup.instanceBackupEnabled, true, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix)) - jd.config.RegisterBoolConfigVariable(false, &jd.conf.backup.FailedOnly, false, fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix)) - jd.config.RegisterDurationConfigVariable(10, &jd.conf.backup.maxBackupRetryTime, false, time.Minute, "JobsDB.backup.maxRetry") - jd.config.RegisterInt64ConfigVariable(10000, &jd.conf.backup.backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize") - jd.config.RegisterInt64ConfigVariable(64*bytesize.MB, &jd.conf.backup.backupMaxTotalPayloadSize, true, 1, "JobsDB.maxBackupTotalPayloadSize") - jd.config.RegisterDurationConfigVariable(5, &jd.conf.backup.backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...) - jd.config.RegisterStringConfigVariable(jd.tablePrefix, &jd.conf.backup.PathPrefix, false, fmt.Sprintf("JobsDB.backup.%v.pathPrefix", jd.tablePrefix)) + jd.conf.backup.instanceBackupEnabled = jd.config.GetReloadableBoolVar( + false, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix), + ) + jd.conf.backup.FailedOnly = jd.config.GetBool( + fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix), + false, + ) + jd.conf.backup.maxBackupRetryTime = jd.config.GetReloadableDurationVar( + 10, time.Minute, "JobsDB.backup.maxRetry", + ) + jd.conf.backup.backupRowsBatchSize = jd.config.GetReloadableInt64Var( + 10000, 1, "JobsDB.backupRowsBatchSize", + ) + jd.conf.backup.backupMaxTotalPayloadSize = jd.config.GetReloadableInt64Var( + 64*bytesize.MB, 1, "JobsDB.backupMaxTotalPayloadSize", + ) + jd.conf.backup.backupCheckSleepDuration = jd.config.GetReloadableDurationVar( + 5, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}..., + ) + jd.conf.backup.PathPrefix = jd.config.GetString( + jd.tablePrefix, fmt.Sprintf("JobsDB.backup.%v.pathPrefix", jd.tablePrefix), + ) // maxDSSize: Maximum size of a DS. The process which adds new DS runs in the background // (every few seconds) so a DS may go beyond this size - if jd.conf.MaxDSSize == nil { - // passing `maxDSSize` by reference, so it can be hot reloaded - var maxDSSize int - jd.config.RegisterIntConfigVariable(100000, &maxDSSize, true, 1, "JobsDB.maxDSSize") - jd.conf.MaxDSSize = &maxDSSize - } + // passing `maxDSSize` by reference, so it can be hot reloaded + jd.conf.MaxDSSize = jd.config.GetReloadableIntVar(100000, 1, "JobsDB.maxDSSize") if jd.TriggerAddNewDS == nil { jd.TriggerAddNewDS = func() <-chan time.Time { - return time.After(jd.conf.addNewDSLoopSleepDuration) + return time.After(jd.conf.addNewDSLoopSleepDuration.Load()) } } if jd.TriggerMigrateDS == nil { jd.TriggerMigrateDS = func() <-chan time.Time { - return time.After(jd.conf.migration.migrateDSLoopSleepDuration) + return time.After(jd.conf.migration.migrateDSLoopSleepDuration.Load()) } } if jd.TriggerRefreshDS == nil { jd.TriggerRefreshDS = func() <-chan time.Time { - return time.After(jd.conf.refreshDSListLoopSleepDuration) + return time.After(jd.conf.refreshDSListLoopSleepDuration.Load()) } } if jd.TriggerJobCleanUp == nil { jd.TriggerJobCleanUp = func() <-chan time.Time { - return time.After(jd.conf.jobCleanupFrequency) + return time.After(jd.conf.jobCleanupFrequency.Load()) } } @@ -1198,7 +1222,7 @@ func (jd *Handle) getTableSize(jobTable string) int64 { } func (jd *Handle) checkIfFullDSInTx(tx *Tx, ds dataSetT) (bool, error) { - if jd.conf.maxDSRetentionPeriod > 0 { + if jd.conf.maxDSRetentionPeriod.Load() > 0 { var minJobCreatedAt sql.NullTime sqlStatement := fmt.Sprintf(`SELECT MIN(created_at) FROM %q`, ds.JobTable) row := tx.QueryRow(sqlStatement) @@ -1206,19 +1230,19 @@ func (jd *Handle) checkIfFullDSInTx(tx *Tx, ds dataSetT) (bool, error) { if err != nil && err != sql.ErrNoRows { return false, err } - if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod { + if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod.Load() { return true, nil } } tableSize := jd.getTableSize(ds.JobTable) - if tableSize > jd.conf.maxTableSize { + if tableSize > jd.conf.maxTableSize.Load() { jd.logger.Infof("[JobsDB] %s is full in size. Count: %v, Size: %v", ds.JobTable, jd.getTableRowCount(ds.JobTable), tableSize) return true, nil } totalCount := jd.getTableRowCount(ds.JobTable) - if totalCount > *jd.conf.MaxDSSize { + if totalCount > jd.conf.MaxDSSize.Load() { jd.logger.Infof("[JobsDB] %s is full by rows. Count: %v, Size: %v", ds.JobTable, totalCount, jd.getTableSize(ds.JobTable)) return true, nil } @@ -2105,7 +2129,7 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL if _, err = stmt.ExecContext(ctx); err != nil { return err } - if len(jobList) > jd.conf.analyzeThreshold { + if len(jobList) > jd.conf.analyzeThreshold.Load() { _, err = tx.ExecContext(ctx, fmt.Sprintf(`ANALYZE %q`, ds.JobTable)) } @@ -2417,7 +2441,7 @@ func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT return err } - if len(statusList) > jd.conf.analyzeThreshold { + if len(statusList) > jd.conf.analyzeThreshold.Load() { _, err = tx.ExecContext(ctx, fmt.Sprintf(`ANALYZE %q`, ds.JobStatusTable)) } @@ -2581,7 +2605,7 @@ func (jd *Handle) refreshDSListLoop(ctx context.Context) { case <-ctx.Done(): return } - timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.refreshDSTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.refreshDSTimeout.Load()) if err := jd.refreshDSList(timeoutCtx); err != nil { cancel() if !jd.conf.skipMaintenanceError && ctx.Err() == nil { diff --git a/jobsdb/jobsdb_test.go b/jobsdb/jobsdb_test.go index 2ffcdff2ed..50c6109245 100644 --- a/jobsdb/jobsdb_test.go +++ b/jobsdb/jobsdb_test.go @@ -778,19 +778,22 @@ func TestCacheScenarios(t *testing.T) { checkDSLimitJobs := func(t *testing.T, limit int) []*JobT { maxDSSize := 1 + c := config.New() + c.Set("JobsDB.maxDSSize", maxDSSize) var dbWithOneLimit *Handle triggerAddNewDS := make(chan time.Time) if limit > 0 { dbWithOneLimit = NewForReadWrite( "cache", WithDSLimit(&limit), + WithConfig(c), ) } else { dbWithOneLimit = NewForReadWrite( "cache", + WithConfig(c), ) } - dbWithOneLimit.conf.MaxDSSize = &maxDSSize dbWithOneLimit.TriggerAddNewDS = func() <-chan time.Time { return triggerAddNewDS } diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 9f36154087..4c957df034 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -39,7 +39,7 @@ func (jd *Handle) migrateDSLoop(ctx context.Context) { migrate := func() error { start := time.Now() jd.logger.Debugw("Start", "operation", "migrateDSLoop") - timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.migration.migrateDSTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.migration.migrateDSTimeout.Load()) defer cancel() err := jd.doMigrateDS(timeoutCtx) stats.Default.NewTaggedStat("migration_loop", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix, "error": strconv.FormatBool(err != nil)}).Since(start) @@ -267,12 +267,12 @@ func (jd *Handle) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) ( statuses := estimates[ds.JobStatusTable] jobs := estimates[ds.JobTable] if jobs == 0 { // using max ds size if we have no stats for the number of jobs - jobs = float64(*jd.conf.MaxDSSize) + jobs = float64(jd.conf.MaxDSSize.Load()) } return statuses/jobs > jd.conf.migration.jobStatusMigrateThres() }) - return lo.Slice(datasets, 0, jd.conf.migration.maxMigrateDSProbe), nil + return lo.Slice(datasets, 0, jd.conf.migration.maxMigrateDSProbe.Load()), nil } // based on an estimate cleans up the status tables @@ -365,7 +365,7 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dataSetT, p var ( liveDSCount, migrateDSProbeCount int // we don't want `maxDSSize` value to change, during dsList loop - maxDSSize = *jd.conf.MaxDSSize + maxDSSize = jd.conf.MaxDSSize.Load() waiting *smallDS ) @@ -381,7 +381,7 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dataSetT, p idxCheck = idx == len(dsList)-1 } - if liveDSCount >= jd.conf.migration.maxMigrateOnce || pendingJobsCount >= maxDSSize || idxCheck { + if liveDSCount >= jd.conf.migration.maxMigrateOnce.Load() || pendingJobsCount >= maxDSSize || idxCheck { break } @@ -412,7 +412,7 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dataSetT, p } } else { waiting = nil // if there was a small DS waiting, we should remove it since its next dataset is not eligible for migration - if liveDSCount > 0 || migrateDSProbeCount > jd.conf.migration.maxMigrateDSProbe { + if liveDSCount > 0 || migrateDSProbeCount > jd.conf.migration.maxMigrateDSProbe.Load() { // DS is not eligible for migration. But there are data sets on the left eligible to migrate, so break. break } @@ -546,26 +546,26 @@ func (jd *Handle) checkIfMigrateDS(ds dataSetT) ( recordsLeft = totalCount - delCount - if jd.conf.minDSRetentionPeriod > 0 { + if jd.conf.minDSRetentionPeriod.Load() > 0 { var maxCreatedAt time.Time sqlStatement = fmt.Sprintf(`SELECT MAX(created_at) from %q`, ds.JobTable) if err = jd.dbHandle.QueryRow(sqlStatement).Scan(&maxCreatedAt); err != nil { return false, false, 0, fmt.Errorf("error getting max created_at from %s: %w", ds.JobTable, err) } - if time.Since(maxCreatedAt) < jd.conf.minDSRetentionPeriod { + if time.Since(maxCreatedAt) < jd.conf.minDSRetentionPeriod.Load() { return false, false, recordsLeft, nil } } - if jd.conf.maxDSRetentionPeriod > 0 { + if jd.conf.maxDSRetentionPeriod.Load() > 0 { var terminalJobsExist bool sqlStatement = fmt.Sprintf(`SELECT EXISTS ( SELECT id FROM %q WHERE job_state = ANY($1) and exec_time < $2)`, ds.JobStatusTable) - if err = jd.dbHandle.QueryRow(sqlStatement, pq.Array(validTerminalStates), time.Now().Add(-1*jd.conf.maxDSRetentionPeriod)).Scan(&terminalJobsExist); err != nil { + if err = jd.dbHandle.QueryRow(sqlStatement, pq.Array(validTerminalStates), time.Now().Add(-1*jd.conf.maxDSRetentionPeriod.Load())).Scan(&terminalJobsExist); err != nil { return false, false, 0, fmt.Errorf("checking terminalJobsExist %s: %w", ds.JobStatusTable, err) } if terminalJobsExist { @@ -573,7 +573,7 @@ func (jd *Handle) checkIfMigrateDS(ds dataSetT) ( } } - smallThreshold := jd.conf.migration.jobMinRowsMigrateThres() * float64(*jd.conf.MaxDSSize) + smallThreshold := jd.conf.migration.jobMinRowsMigrateThres() * float64(jd.conf.MaxDSSize.Load()) isSmall := func() bool { return float64(totalCount) < smallThreshold } diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index 67c3b84fc9..bfa8b88d24 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -46,7 +46,7 @@ func TestMigration(t *testing.T) { require.NoError(t, err) defer jobDB.TearDown() - jobDB.conf.maxDSRetentionPeriod = time.Millisecond + c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms") customVal := rand.String(5) jobs := genJobs(defaultWorkspaceID, customVal, 30, 1) @@ -232,7 +232,7 @@ func TestMigration(t *testing.T) { )) defer jobDB.TearDown() - jobDB.conf.maxDSRetentionPeriod = time.Millisecond + c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms") // 3 datasets with 10 jobs each, 1 dataset with 0 jobs for i := 0; i < 3; i++ {