Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change stash defaults #3136

Merged
merged 10 commits into from
Apr 24, 2023
19 changes: 13 additions & 6 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

var (
errorStashEnabled bool
errReadLoopSleep time.Duration
errDBReadBatchSize int
noOfErrStashWorkers int
maxFailedCountForErrJob int
Expand All @@ -40,7 +39,6 @@ func Init() {

func loadConfig() {
config.RegisterBoolConfigVariable(true, &errorStashEnabled, true, "Processor.errorStashEnabled")
config.RegisterDurationConfigVariable(30, &errReadLoopSleep, true, time.Second, []string{"Processor.errReadLoopSleep", "errReadLoopSleepInS"}...)
config.RegisterIntConfigVariable(1000, &errDBReadBatchSize, true, 1, "Processor.errDBReadBatchSize")
config.RegisterIntConfigVariable(2, &noOfErrStashWorkers, true, 1, "Processor.noOfErrStashWorkers")
config.RegisterIntConfigVariable(3, &maxFailedCountForErrJob, true, 1, "Processor.maxFailedCountForErrJob")
Expand Down Expand Up @@ -303,15 +301,15 @@ func (st *HandleT) setErrJobStatus(jobs []*jobsdb.JobT, output StoreErrorOutputT

func (st *HandleT) readErrJobsLoop(ctx context.Context) {
st.logger.Info("Processor errors stash loop started")

var sleepTime time.Duration
for {
select {
case <-ctx.Done():
close(st.errProcessQ)
return
case <-time.After(errReadLoopSleep):
case <-time.After(sleepTime):
start := time.Now()

var limitReached bool
// NOTE: sending custom val filters array of size 1 to take advantage of cache in jobsdb.
queryParams := jobsdb.GetQueryParamsT{
CustomValFilters: []string{""},
Expand All @@ -328,6 +326,7 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
}

combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
Expand All @@ -341,8 +340,8 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
}

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -400,6 +399,14 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
if canUpload && len(filteredJobList) > 0 {
st.errProcessQ <- filteredJobList
}
sleepTime = st.calculateSleepTime(limitReached)
}
}
}

func (st *HandleT) calculateSleepTime(limitReached bool) time.Duration {
if limitReached {
return config.GetDuration("Processor.errReadLoopSleep", 30, time.Second)
}
return time.Duration(0)
}