Skip to content

Commit

Permalink
chore: new reloadable config in jobsdb (#3868)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Sep 15, 2023
1 parent c7ab148 commit 2c655f9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 85 deletions.
14 changes: 7 additions & 7 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
)

func (jd *Handle) isBackupEnabled() bool {
return jd.conf.backup.masterBackupEnabled && jd.conf.backup.instanceBackupEnabled
return jd.conf.backup.masterBackupEnabled.Load() && jd.conf.backup.instanceBackupEnabled.Load()
}

func (jd *Handle) IsMasterBackupEnabled() bool {
return jd.conf.backup.masterBackupEnabled
return jd.conf.backup.masterBackupEnabled.Load()
}

func (jd *Handle) backupDSLoop(ctx context.Context) {
Expand All @@ -36,7 +36,7 @@ func (jd *Handle) backupDSLoop(ctx context.Context) {

for {
select {
case <-time.After(sleepMultiplier * jd.conf.backup.backupCheckSleepDuration):
case <-time.After(sleepMultiplier * jd.conf.backup.backupCheckSleepDuration.Load()):
if !jd.isBackupEnabled() {
jd.logger.Debugf("backupDSLoop backup disabled %s", jd.tablePrefix)
continue
Expand Down Expand Up @@ -200,7 +200,7 @@ func (jd *Handle) failedOnlyBackup(ctx context.Context, backupDSRange *dataSetRa
return fmt.Sprintf(`%v%v_%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, Aborted.State, workspaceID), nil
}

dumps, err := jd.createTableDumps(ctx, getFailedOnlyBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize, jd.conf.backup.backupMaxTotalPayloadSize), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getFailedOnlyBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load(), jd.conf.backup.backupMaxTotalPayloadSize.Load()), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (jd *Handle) backupJobsTable(ctx context.Context, backupDSRange *dataSetRan
), nil
}

dumps, err := jd.createTableDumps(ctx, getJobsBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize, jd.conf.backup.backupMaxTotalPayloadSize), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getJobsBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load(), jd.conf.backup.backupMaxTotalPayloadSize.Load()), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (jd *Handle) backupStatusTable(ctx context.Context, backupDSRange *dataSetR
return fmt.Sprintf(`%v%v.%v.gz`, tmpDirPath+backupPathDirName, pathPrefix, workspaceID), nil
}

dumps, err := jd.createTableDumps(ctx, getStatusBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize), getFileName, totalCount)
dumps, err := jd.createTableDumps(ctx, getStatusBackupQueryFn(backupDSRange, jd.conf.backup.backupRowsBatchSize.Load()), getFileName, totalCount)
if err != nil {
return fmt.Errorf("error while creating table dump: %w", err)
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func (jd *Handle) backupUploadWithExponentialBackoff(ctx context.Context, file *
}
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = time.Minute
bo.MaxElapsedTime = jd.conf.backup.maxBackupRetryTime
bo.MaxElapsedTime = jd.conf.backup.maxBackupRetryTime.Load()
boRetries := backoff.WithMaxRetries(bo, uint64(jd.config.GetInt64("MAX_BACKOFF_RETRIES", 3)))
boCtx := backoff.WithContext(boRetries, ctx)

Expand Down
152 changes: 88 additions & 64 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,14 @@ type Handle struct {

config *config.Config
conf struct {
maxTableSize int64
cacheExpiration time.Duration
addNewDSLoopSleepDuration time.Duration
refreshDSListLoopSleepDuration time.Duration
jobCleanupFrequency time.Duration
minDSRetentionPeriod time.Duration
maxDSRetentionPeriod time.Duration
refreshDSTimeout time.Duration
maxTableSize *config.Reloadable[int64]
cacheExpiration *config.Reloadable[time.Duration]
addNewDSLoopSleepDuration *config.Reloadable[time.Duration]
refreshDSListLoopSleepDuration *config.Reloadable[time.Duration]
jobCleanupFrequency *config.Reloadable[time.Duration]
minDSRetentionPeriod *config.Reloadable[time.Duration]
maxDSRetentionPeriod *config.Reloadable[time.Duration]
refreshDSTimeout *config.Reloadable[time.Duration]
jobMaxAge func() time.Duration
writeCapacity chan struct{}
readCapacity chan struct{}
Expand All @@ -502,28 +502,28 @@ type Handle struct {
maxReaders int
maxWriters int
maxOpenConnections int
analyzeThreshold int
MaxDSSize *int
analyzeThreshold *config.Reloadable[int]
MaxDSSize *config.Reloadable[int]
migration struct {
maxMigrateOnce, maxMigrateDSProbe int
maxMigrateOnce, maxMigrateDSProbe *config.Reloadable[int]
vacuumFullStatusTableThreshold func() int64
vacuumAnalyzeStatusTableThreshold func() int64
jobDoneMigrateThres, jobStatusMigrateThres func() float64
jobMinRowsMigrateThres func() float64
migrateDSLoopSleepDuration time.Duration
migrateDSTimeout time.Duration
migrateDSLoopSleepDuration *config.Reloadable[time.Duration]
migrateDSTimeout *config.Reloadable[time.Duration]
}
backup struct {
masterBackupEnabled bool
maxBackupRetryTime time.Duration
backupCheckSleepDuration time.Duration
masterBackupEnabled *config.Reloadable[bool]
maxBackupRetryTime *config.Reloadable[time.Duration]
backupCheckSleepDuration *config.Reloadable[time.Duration]
preBackupHandlers []prebackup.Handler
fileUploaderProvider fileuploader.Provider
instanceBackupEnabled bool
instanceBackupEnabled *config.Reloadable[bool]
FailedOnly bool
PathPrefix string
backupRowsBatchSize int64
backupMaxTotalPayloadSize int64
backupRowsBatchSize *config.Reloadable[int64]
backupMaxTotalPayloadSize *config.Reloadable[int64]
}
}
}
Expand Down Expand Up @@ -845,7 +845,7 @@ func (jd *Handle) workersAndAuxSetup() {

jd.noResultsCache = cache.NewNoResultsCache[ParameterFilterT](
cacheParameterFilters,
func() time.Duration { return jd.conf.cacheExpiration },
func() time.Duration { return jd.conf.cacheExpiration.Load() },
)

jd.logger.Infof("Connected to %s DB", jd.tablePrefix)
Expand All @@ -855,40 +855,47 @@ func (jd *Handle) workersAndAuxSetup() {
jd.statDropDSPeriod = stats.Default.NewTaggedStat("jobsdb.drop_ds_period", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (jd *Handle) loadConfig() {
// maxTableSizeInMB: Maximum Table size in MB
jd.config.RegisterInt64ConfigVariable(300, &jd.conf.maxTableSize, true, 1000000, "JobsDB.maxTableSizeInMB")
jd.config.RegisterDurationConfigVariable(120, &jd.conf.cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
jd.conf.maxTableSize = jd.config.GetReloadableInt64Var(1000000, 300, "JobsDB.maxTableSizeInMB")
jd.conf.cacheExpiration = jd.config.GetReloadableDurationVar(120, time.Minute, []string{"JobsDB.cacheExpiration"}...)
// addNewDSLoopSleepDuration: How often is the loop (which checks for adding new DS) run
jd.config.RegisterDurationConfigVariable(5, &jd.conf.addNewDSLoopSleepDuration, true, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
jd.conf.addNewDSLoopSleepDuration = jd.config.GetReloadableDurationVar(5, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
// refreshDSListLoopSleepDuration: How often is the loop (which refreshes DSList) run
jd.config.RegisterDurationConfigVariable(10, &jd.conf.refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
jd.config.RegisterDurationConfigVariable(24, &jd.conf.jobCleanupFrequency, true, time.Hour, []string{"JobsDB.jobCleanupFrequency"}...)
jd.conf.refreshDSListLoopSleepDuration = jd.config.GetReloadableDurationVar(10, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
jd.conf.jobCleanupFrequency = jd.config.GetReloadableDurationVar(24, time.Hour, []string{"JobsDB.jobCleanupFrequency"}...)

enableWriterQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableWriterQueue", "JobsDB." + "enableWriterQueue"}
jd.config.RegisterBoolConfigVariable(true, &jd.conf.enableWriterQueue, true, enableWriterQueueKeys...)
jd.conf.enableWriterQueue = jd.config.GetBoolVar(true, enableWriterQueueKeys...)
enableReaderQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableReaderQueue", "JobsDB." + "enableReaderQueue"}
jd.config.RegisterBoolConfigVariable(true, &jd.conf.enableReaderQueue, true, enableReaderQueueKeys...)
jd.conf.enableReaderQueue = jd.config.GetBoolVar(true, enableReaderQueueKeys...)
maxWritersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxWriters", "JobsDB." + "maxWriters"}
jd.config.RegisterIntConfigVariable(3, &jd.conf.maxWriters, false, 1, maxWritersKeys...)
jd.conf.maxWriters = jd.config.GetIntVar(3, 1, maxWritersKeys...)
maxReadersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxReaders", "JobsDB." + "maxReaders"}
jd.config.RegisterIntConfigVariable(6, &jd.conf.maxReaders, false, 1, maxReadersKeys...)
jd.conf.maxReaders = jd.config.GetIntVar(6, 1, maxReadersKeys...)
maxOpenConnectionsKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxOpenConnections", "JobsDB." + "maxOpenConnections"}
jd.config.RegisterIntConfigVariable(20, &jd.conf.maxOpenConnections, false, 1, maxOpenConnectionsKeys...)
jd.conf.maxOpenConnections = jd.config.GetIntVar(20, 1, maxOpenConnectionsKeys...)
analyzeThresholdKeys := []string{"JobsDB." + jd.tablePrefix + "." + "analyzeThreshold", "JobsDB." + "analyzeThreshold"}
jd.config.RegisterIntConfigVariable(30000, &jd.conf.analyzeThreshold, false, 1, analyzeThresholdKeys...)
jd.conf.analyzeThreshold = jd.config.GetReloadableIntVar(30000, 1, analyzeThresholdKeys...)
minDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "minDSRetention", "JobsDB." + "minDSRetention"}
jd.config.RegisterDurationConfigVariable(0, &jd.conf.minDSRetentionPeriod, true, time.Minute, minDSRetentionPeriodKeys...)
jd.conf.minDSRetentionPeriod = jd.config.GetReloadableDurationVar(0, time.Minute, minDSRetentionPeriodKeys...)
maxDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxDSRetention", "JobsDB." + "maxDSRetention"}
jd.config.RegisterDurationConfigVariable(90, &jd.conf.maxDSRetentionPeriod, true, time.Minute, maxDSRetentionPeriodKeys...)
jd.config.RegisterDurationConfigVariable(10, &jd.conf.refreshDSTimeout, true, time.Minute, "JobsDB.refreshDS.timeout")
jd.conf.maxDSRetentionPeriod = jd.config.GetReloadableDurationVar(90, time.Minute, maxDSRetentionPeriodKeys...)
jd.conf.refreshDSTimeout = jd.config.GetReloadableDurationVar(10, time.Minute, "JobsDB.refreshDS.timeout")

// migrationConfig

// migrateDSLoopSleepDuration: How often is the loop (which checks for migrating DS) run
jd.config.RegisterDurationConfigVariable(30, &jd.conf.migration.migrateDSLoopSleepDuration, true, time.Second, []string{"JobsDB.migrateDSLoopSleepDuration", "JobsDB.migrateDSLoopSleepDurationInS"}...)
jd.config.RegisterDurationConfigVariable(10, &jd.conf.migration.migrateDSTimeout, true, time.Minute, "JobsDB.migrateDS.timeout")
jd.conf.migration.migrateDSLoopSleepDuration = jd.config.GetReloadableDurationVar(
30, time.Second,
[]string{
"JobsDB.migrateDSLoopSleepDuration",
"JobsDB.migrateDSLoopSleepDurationInS",
}...,
)
jd.conf.migration.migrateDSTimeout = jd.config.GetReloadableDurationVar(
10, time.Minute, "JobsDB.migrateDS.timeout",
)
// jobDoneMigrateThres: A DS is migrated when this fraction of the jobs have been processed
jd.conf.migration.jobDoneMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobDoneMigrateThreshold", 0.7) }
// jobStatusMigrateThres: A DS is migrated if the job_status exceeds this (* no_of_jobs)
Expand All @@ -898,9 +905,13 @@ func (jd *Handle) loadConfig() {
// then DSs that have less than 5% of maxDSSize are eligible for migration)
jd.conf.migration.jobMinRowsMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobMinRowsMigrateThreshold", 0.2) }
// maxMigrateOnce: Maximum number of DSs that are migrated together into one destination
jd.config.RegisterIntConfigVariable(10, &jd.conf.migration.maxMigrateOnce, true, 1, "JobsDB.maxMigrateOnce")
jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar(
10, 1, "JobsDB.maxMigrateOnce",
)
// maxMigrateDSProbe: Maximum number of DSs that are checked from left to right if they are eligible for migration
jd.config.RegisterIntConfigVariable(10, &jd.conf.migration.maxMigrateDSProbe, true, 1, "JobsDB.maxMigrateDSProbe")
jd.conf.migration.maxMigrateDSProbe = jd.config.GetReloadableIntVar(
10, 1, "JobsDB.maxMigrateDSProbe",
)
jd.conf.migration.vacuumFullStatusTableThreshold = func() int64 {
return jd.config.GetInt64("JobsDB.vacuumFullStatusTableThreshold", 500*bytesize.MB)
}
Expand All @@ -911,46 +922,59 @@ func (jd *Handle) loadConfig() {
// backupConfig

// masterBackupEnabled = true => all the jobsdb are eligible for backup
jd.config.RegisterBoolConfigVariable(true, &jd.conf.backup.masterBackupEnabled, true, "JobsDB.backup.enabled")
jd.conf.backup.masterBackupEnabled = jd.config.GetReloadableBoolVar(
true, "JobsDB.backup.enabled",
)
// instanceBackupEnabled = true => the individual jobsdb too is eligible for backup
jd.config.RegisterBoolConfigVariable(false, &jd.conf.backup.instanceBackupEnabled, true, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix))
jd.config.RegisterBoolConfigVariable(false, &jd.conf.backup.FailedOnly, false, fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix))
jd.config.RegisterDurationConfigVariable(10, &jd.conf.backup.maxBackupRetryTime, false, time.Minute, "JobsDB.backup.maxRetry")
jd.config.RegisterInt64ConfigVariable(10000, &jd.conf.backup.backupRowsBatchSize, true, 1, "JobsDB.backupRowsBatchSize")
jd.config.RegisterInt64ConfigVariable(64*bytesize.MB, &jd.conf.backup.backupMaxTotalPayloadSize, true, 1, "JobsDB.maxBackupTotalPayloadSize")
jd.config.RegisterDurationConfigVariable(5, &jd.conf.backup.backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...)
jd.config.RegisterStringConfigVariable(jd.tablePrefix, &jd.conf.backup.PathPrefix, false, fmt.Sprintf("JobsDB.backup.%v.pathPrefix", jd.tablePrefix))
jd.conf.backup.instanceBackupEnabled = jd.config.GetReloadableBoolVar(
false, fmt.Sprintf("JobsDB.backup.%v.enabled", jd.tablePrefix),
)
jd.conf.backup.FailedOnly = jd.config.GetBool(
fmt.Sprintf("JobsDB.backup.%v.failedOnly", jd.tablePrefix),
false,
)
jd.conf.backup.maxBackupRetryTime = jd.config.GetReloadableDurationVar(
10, time.Minute, "JobsDB.backup.maxRetry",
)
jd.conf.backup.backupRowsBatchSize = jd.config.GetReloadableInt64Var(
10000, 1, "JobsDB.backupRowsBatchSize",
)
jd.conf.backup.backupMaxTotalPayloadSize = jd.config.GetReloadableInt64Var(
64*bytesize.MB, 1, "JobsDB.backupMaxTotalPayloadSize",
)
jd.conf.backup.backupCheckSleepDuration = jd.config.GetReloadableDurationVar(
5, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...,
)
jd.conf.backup.PathPrefix = jd.config.GetString(
jd.tablePrefix, fmt.Sprintf("JobsDB.backup.%v.pathPrefix", jd.tablePrefix),
)

// maxDSSize: Maximum size of a DS. The process which adds new DS runs in the background
// (every few seconds) so a DS may go beyond this size
if jd.conf.MaxDSSize == nil {
// passing `maxDSSize` by reference, so it can be hot reloaded
var maxDSSize int
jd.config.RegisterIntConfigVariable(100000, &maxDSSize, true, 1, "JobsDB.maxDSSize")
jd.conf.MaxDSSize = &maxDSSize
}
// passing `maxDSSize` by reference, so it can be hot reloaded
jd.conf.MaxDSSize = jd.config.GetReloadableIntVar(100000, 1, "JobsDB.maxDSSize")

if jd.TriggerAddNewDS == nil {
jd.TriggerAddNewDS = func() <-chan time.Time {
return time.After(jd.conf.addNewDSLoopSleepDuration)
return time.After(jd.conf.addNewDSLoopSleepDuration.Load())
}
}

if jd.TriggerMigrateDS == nil {
jd.TriggerMigrateDS = func() <-chan time.Time {
return time.After(jd.conf.migration.migrateDSLoopSleepDuration)
return time.After(jd.conf.migration.migrateDSLoopSleepDuration.Load())
}
}

if jd.TriggerRefreshDS == nil {
jd.TriggerRefreshDS = func() <-chan time.Time {
return time.After(jd.conf.refreshDSListLoopSleepDuration)
return time.After(jd.conf.refreshDSListLoopSleepDuration.Load())
}
}

if jd.TriggerJobCleanUp == nil {
jd.TriggerJobCleanUp = func() <-chan time.Time {
return time.After(jd.conf.jobCleanupFrequency)
return time.After(jd.conf.jobCleanupFrequency.Load())
}
}

Expand Down Expand Up @@ -1198,27 +1222,27 @@ func (jd *Handle) getTableSize(jobTable string) int64 {
}

func (jd *Handle) checkIfFullDSInTx(tx *Tx, ds dataSetT) (bool, error) {
if jd.conf.maxDSRetentionPeriod > 0 {
if jd.conf.maxDSRetentionPeriod.Load() > 0 {
var minJobCreatedAt sql.NullTime
sqlStatement := fmt.Sprintf(`SELECT MIN(created_at) FROM %q`, ds.JobTable)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&minJobCreatedAt)
if err != nil && err != sql.ErrNoRows {
return false, err
}
if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod {
if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod.Load() {
return true, nil
}
}

tableSize := jd.getTableSize(ds.JobTable)
if tableSize > jd.conf.maxTableSize {
if tableSize > jd.conf.maxTableSize.Load() {
jd.logger.Infof("[JobsDB] %s is full in size. Count: %v, Size: %v", ds.JobTable, jd.getTableRowCount(ds.JobTable), tableSize)
return true, nil
}

totalCount := jd.getTableRowCount(ds.JobTable)
if totalCount > *jd.conf.MaxDSSize {
if totalCount > jd.conf.MaxDSSize.Load() {
jd.logger.Infof("[JobsDB] %s is full by rows. Count: %v, Size: %v", ds.JobTable, totalCount, jd.getTableSize(ds.JobTable))
return true, nil
}
Expand Down Expand Up @@ -2105,7 +2129,7 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
if len(jobList) > jd.conf.analyzeThreshold {
if len(jobList) > jd.conf.analyzeThreshold.Load() {
_, err = tx.ExecContext(ctx, fmt.Sprintf(`ANALYZE %q`, ds.JobTable))
}

Expand Down Expand Up @@ -2417,7 +2441,7 @@ func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT
return err
}

if len(statusList) > jd.conf.analyzeThreshold {
if len(statusList) > jd.conf.analyzeThreshold.Load() {
_, err = tx.ExecContext(ctx, fmt.Sprintf(`ANALYZE %q`, ds.JobStatusTable))
}

Expand Down Expand Up @@ -2581,7 +2605,7 @@ func (jd *Handle) refreshDSListLoop(ctx context.Context) {
case <-ctx.Done():
return
}
timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.refreshDSTimeout)
timeoutCtx, cancel := context.WithTimeout(ctx, jd.conf.refreshDSTimeout.Load())
if err := jd.refreshDSList(timeoutCtx); err != nil {
cancel()
if !jd.conf.skipMaintenanceError && ctx.Err() == nil {
Expand Down
5 changes: 4 additions & 1 deletion jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,19 +778,22 @@ func TestCacheScenarios(t *testing.T) {

checkDSLimitJobs := func(t *testing.T, limit int) []*JobT {
maxDSSize := 1
c := config.New()
c.Set("JobsDB.maxDSSize", maxDSSize)
var dbWithOneLimit *Handle
triggerAddNewDS := make(chan time.Time)
if limit > 0 {
dbWithOneLimit = NewForReadWrite(
"cache",
WithDSLimit(&limit),
WithConfig(c),
)
} else {
dbWithOneLimit = NewForReadWrite(
"cache",
WithConfig(c),
)
}
dbWithOneLimit.conf.MaxDSSize = &maxDSSize
dbWithOneLimit.TriggerAddNewDS = func() <-chan time.Time {
return triggerAddNewDS
}
Expand Down
Loading

0 comments on commit 2c655f9

Please sign in to comment.