Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change stash defaults #3136

Merged
merged 10 commits into from
Apr 24, 2023
5 changes: 4 additions & 1 deletion integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,11 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
})
containersGroup.Go(func() (err error) {
postgresContainer, err = resource.SetupPostgres(pool, t)
if err != nil {
return err
}
db = postgresContainer.DB
return err
return nil
})
containersGroup.Go(func() (err error) {
transformerContainer, err = destination.SetupTransformer(pool, t)
Expand Down
2 changes: 2 additions & 0 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,8 @@ var _ = Describe("Processor", Ordered, func() {

c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockProcErrorsDB.EXPECT().FailExecuting().Times(1)
c.mockProcErrorsDB.EXPECT().GetToRetry(gomock.Any(), gomock.Any()).AnyTimes()
c.mockProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes()

var wg sync.WaitGroup
wg.Add(1)
Expand Down
30 changes: 24 additions & 6 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

var (
errorStashEnabled bool
errReadLoopSleep time.Duration
errDBReadBatchSize int
noOfErrStashWorkers int
maxFailedCountForErrJob int
Expand All @@ -40,7 +39,6 @@ func Init() {

func loadConfig() {
config.RegisterBoolConfigVariable(true, &errorStashEnabled, true, "Processor.errorStashEnabled")
config.RegisterDurationConfigVariable(30, &errReadLoopSleep, true, time.Second, []string{"Processor.errReadLoopSleep", "errReadLoopSleepInS"}...)
config.RegisterIntConfigVariable(1000, &errDBReadBatchSize, true, 1, "Processor.errDBReadBatchSize")
config.RegisterIntConfigVariable(2, &noOfErrStashWorkers, true, 1, "Processor.noOfErrStashWorkers")
config.RegisterIntConfigVariable(3, &maxFailedCountForErrJob, true, 1, "Processor.maxFailedCountForErrJob")
Expand Down Expand Up @@ -303,15 +301,15 @@ func (st *HandleT) setErrJobStatus(jobs []*jobsdb.JobT, output StoreErrorOutputT

func (st *HandleT) readErrJobsLoop(ctx context.Context) {
st.logger.Info("Processor errors stash loop started")

var sleepTime time.Duration
for {
select {
case <-ctx.Done():
close(st.errProcessQ)
return
case <-time.After(errReadLoopSleep):
case <-time.After(sleepTime):
start := time.Now()

var limitReached bool
// NOTE: sending custom val filters array of size 1 to take advantage of cache in jobsdb.
queryParams := jobsdb.GetQueryParamsT{
CustomValFilters: []string{""},
Expand All @@ -323,11 +321,16 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
return st.errorDB.GetToRetry(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
Expand All @@ -337,12 +340,16 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
}

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -393,13 +400,24 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, sendRetryUpdateStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
return //nolint:nilerr
}
pkgLogger.Errorf("Error occurred while marking proc error jobs statuses as %v. Panicking. Err: %v", jobState, err)
panic(err)
}

if canUpload && len(filteredJobList) > 0 {
st.errProcessQ <- filteredJobList
}
sleepTime = st.calculateSleepTime(limitReached)
}
}
}

func (*HandleT) calculateSleepTime(limitReached bool) time.Duration {
if limitReached {
return config.GetDuration("Processor.errReadLoopSleep", 30, time.Second)
}
return time.Duration(0)
}