Skip to content

Commit

Permalink
feat(jobsdb): made backup related config hot-reloadable (#1844)
Browse files Browse the repository at this point in the history
* made backup config hot reloadable
  • Loading branch information
kuldeep0020 committed Apr 20, 2022
1 parent 486c1d1 commit 5f69442
Showing 1 changed file with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions jobsdb/jobsdb.go
Expand Up @@ -57,9 +57,13 @@ import (
// configuration from the config/env files to
// instantiate jobdb correctly
type BackupSettingsT struct {
BackupEnabled bool
FailedOnly bool
PathPrefix string
instanceBackupEnabled bool
FailedOnly bool
PathPrefix string
}

func (b *BackupSettingsT) IsBackupEnabled() bool {
return masterBackupEnabled && b.instanceBackupEnabled
}

// GetQueryParamsT is a struct to hold jobsdb query params.
Expand Down Expand Up @@ -138,7 +142,7 @@ const (
)

var globalDBHandle *sql.DB
var masterBackupEnabled, instanceBackupEnabled, instanceBackupFailedAndAborted bool
var masterBackupEnabled bool
var pathPrefix string

//initGlobalDBHandle inits a sql.DB handle to be used across jobsdb instances
Expand Down Expand Up @@ -387,22 +391,18 @@ var dbErrorMap = map[string]string{
"Invalid Escape Character": "22019",
}

// return backup settings depending on jobdb type
// the gateway, the router and the processor
// BackupEnabled = true => all the jobsdb are eligible for backup
// registers the backup settings depending on jobdb type the gateway, the router and the processor
// masterBackupEnabled = true => all the jobsdb are eligible for backup
// instanceBackupEnabled = true => the individual jobsdb too is eligible for backup
// instanceBackupFailedAndAborted = true => the individual jobdb backsup failed and aborted jobs only
// pathPrefix = by default is the jobsdb table prefix, is the path appended before instanceID in s3 folder structure
func (jd *HandleT) getBackUpSettings() *BackupSettingsT {
config.RegisterBoolConfigVariable(true, &masterBackupEnabled, false, "JobsDB.backup.enabled")
config.RegisterBoolConfigVariable(false, &instanceBackupEnabled, false, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix))
config.RegisterBoolConfigVariable(false, &instanceBackupFailedAndAborted, false, fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix))
func (jd *HandleT) registerBackUpSettings() {
config.RegisterBoolConfigVariable(true, &masterBackupEnabled, true, "JobsDB.backup.enabled")
config.RegisterBoolConfigVariable(false, &jd.BackupSettings.instanceBackupEnabled, true, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix))
config.RegisterBoolConfigVariable(false, &jd.BackupSettings.FailedOnly, false, fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix))
config.RegisterStringConfigVariable(jd.tablePrefix, &pathPrefix, false, fmt.Sprintf("JobsDB.backup.%v.pathPrefix", jd.tablePrefix))
config.RegisterDurationConfigVariable(10, &jd.maxBackupRetryTime, false, time.Minute, "JobsDB.backup.maxRetry")
backupSettings := BackupSettingsT{BackupEnabled: masterBackupEnabled && instanceBackupEnabled,
FailedOnly: instanceBackupFailedAndAborted, PathPrefix: strings.TrimSpace(pathPrefix)}

return &backupSettings
jd.BackupSettings.PathPrefix = strings.TrimSpace(pathPrefix)
}

//Some helper functions
Expand Down Expand Up @@ -443,7 +443,7 @@ func (jd *HandleT) Status() interface{} {
statusObj := map[string]interface{}{
"dataset-list": jd.getDSList(false),
"dataset-ranges": jd.getDSRangeList(false),
"backups-enabled": jd.BackupSettings.BackupEnabled,
"backups-enabled": jd.BackupSettings.IsBackupEnabled(),
}
emptyResults := make(map[string]interface{})
for ds, entry := range jd.dsEmptyResultCache {
Expand Down Expand Up @@ -747,8 +747,8 @@ func (jd *HandleT) workersAndAuxSetup() {
if jd.registerStatusHandler {
admin.RegisterStatusHandler(jd.tablePrefix+"-jobsdb", jd)
}

jd.BackupSettings = jd.getBackUpSettings()
jd.BackupSettings = &BackupSettingsT{}
jd.registerBackUpSettings()

jd.logger.Infof("Connected to %s DB", jd.tablePrefix)

Expand Down Expand Up @@ -820,14 +820,15 @@ func (jd *HandleT) setUpForOwnerType(ctx context.Context, ownerType OwnerType, c

func (jd *HandleT) startBackupDSLoop(ctx context.Context) {
var err error
if jd.BackupSettings.BackupEnabled {
jd.jobsFileUploader, err = jd.getFileUploader()
jd.assertError(err)
jd.backgroundGroup.Go(misc.WithBugsnag(func() error {
jd.backupDSLoop(ctx)
return nil
}))
jd.jobsFileUploader, err = jd.getFileUploader()
if err != nil {
jd.logger.Errorf("failed to get a file uploader for %s", jd.tablePrefix)
return
}
jd.backgroundGroup.Go(misc.WithBugsnag(func() error {
jd.backupDSLoop(ctx)
return nil
}))
}

func (jd *HandleT) startMigrateDSLoop(ctx context.Context) {
Expand Down Expand Up @@ -1762,7 +1763,7 @@ func (jd *HandleT) postMigrateHandleDS(migrateFrom []dataSetT) error {

//Rename datasets before dropping them, so that they can be uploaded to s3
for _, ds := range migrateFrom {
if jd.BackupSettings.BackupEnabled && isBackupConfigured() {
if jd.BackupSettings.IsBackupEnabled() && isBackupConfigured() {
jd.renameDS(ds, false)
} else {
jd.dropDS(ds, false)
Expand Down Expand Up @@ -2759,10 +2760,14 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
for {
select {
case <-time.After(sleepMultiplier * backupCheckSleepDuration):
if !jd.BackupSettings.IsBackupEnabled() {
jd.logger.Debugf("backupDSLoop backup disabled %s", jd.tablePrefix)
continue
}
case <-ctx.Done():
return
}

jd.logger.Debugf("backupDSLoop backup enabled %s", jd.tablePrefix)
backupDSRange := jd.getBackupDSRange()
// check if non empty dataset is present to backup
// else continue
Expand Down Expand Up @@ -3279,7 +3284,7 @@ func (jd *HandleT) recoverFromCrash(owner OwnerType, goRoutineType string) {
//Some of the source datasets would have been
migrateSrc := opPayloadJSON.From
for _, ds := range migrateSrc {
if jd.BackupSettings.BackupEnabled {
if jd.BackupSettings.IsBackupEnabled() {
jd.renameDS(ds, true)
} else {
jd.dropDS(ds, true)
Expand Down

0 comments on commit 5f69442

Please sign in to comment.