diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 9c64af3ce1b..1fc81dde89e 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1920,10 +1920,6 @@ func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003 - if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query - return jd.getToProcessLegacy(ctx, params, more) - } - if params.JobsLimit == 0 { return &MoreJobsResult{More: more}, nil } diff --git a/processor/stash/stash.go b/processor/stash/stash.go index f7807bfe29c..2356ad780e0 100644 --- a/processor/stash/stash.go +++ b/processor/stash/stash.go @@ -312,57 +312,21 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) { PayloadSizeLimit: st.adaptiveLimit(st.config.payloadLimit.Load()), } - if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query - toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) - }, st.sendQueryRetryStats) - if err != nil { - if ctx.Err() != nil { // we are shutting down - close(st.errProcessQ) - return //nolint:nilerr - } - st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) - panic(err) - } - - combinedList = toProcess.Jobs - limitReached = toProcess.LimitsReached - } else { - toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return st.errorDB.GetFailed(ctx, queryParams) - }, st.sendQueryRetryStats) - if err != nil { - if ctx.Err() != nil { // we are shutting down - close(st.errProcessQ) - return //nolint:nilerr - } - st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) - panic(err) - } - - combinedList = toRetry.Jobs - limitReached = toRetry.LimitsReached - if !toRetry.LimitsReached { - queryParams.JobsLimit -= len(toRetry.Jobs) - if queryParams.PayloadSizeLimit > 0 { - queryParams.PayloadSizeLimit -= toRetry.PayloadSize - } - unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return st.errorDB.GetUnprocessed(ctx, queryParams) - }, st.sendQueryRetryStats) - if err != nil { - if ctx.Err() != nil { // we are shutting down - close(st.errProcessQ) - return - } - st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) - panic(err) - } - combinedList = append(combinedList, unprocessed.Jobs...) - limitReached = unprocessed.LimitsReached + toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { + return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) + }, st.sendQueryRetryStats) + if err != nil { + if ctx.Err() != nil { // we are shutting down + close(st.errProcessQ) + return //nolint:nilerr } + st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) + panic(err) } + combinedList = toProcess.Jobs + limitReached = toProcess.LimitsReached + st.statErrDBR.Since(start) if len(combinedList) == 0 { @@ -410,10 +374,9 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) { } statusList = append(statusList, &status) } - err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error { + if err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error { return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil) - }, st.sendRetryUpdateStats) - if err != nil { + }, st.sendRetryUpdateStats); err != nil { if ctx.Err() != nil { // we are shutting down return //nolint:nilerr } diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index fb78c69bf4e..41023b76ed4 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -193,42 +193,15 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob brt.isolationStrategy.AugmentQueryParams(partition, &queryParams) var limitsReached bool - if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query - toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) - }, brt.sendQueryRetryStats) - if err != nil { - brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) - panic(err) - } - jobs = toProcess.Jobs - limitsReached = toProcess.LimitsReached - } else { - toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetFailed(ctx, queryParams) - }, brt.sendQueryRetryStats) - if err != nil { - brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) - panic(err) - } - jobs = toRetry.Jobs - limitsReached = toRetry.LimitsReached - if !limitsReached { - queryParams.JobsLimit -= len(toRetry.Jobs) - if queryParams.PayloadSizeLimit > 0 { - queryParams.PayloadSizeLimit -= toRetry.PayloadSize - } - unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetUnprocessed(ctx, queryParams) - }, brt.sendQueryRetryStats) - if err != nil { - brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) - panic(err) - } - jobs = append(jobs, unprocessed.Jobs...) - limitsReached = unprocessed.LimitsReached - } + toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { + return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) + }, brt.sendQueryRetryStats) + if err != nil { + brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) + panic(err) } + jobs = toProcess.Jobs + limitsReached = toProcess.LimitsReached brtQueryStat.Since(queryStart) sort.Slice(jobs, func(i, j int) bool { diff --git a/router/handle.go b/router/handle.go index 0cac437e6d4..dc550e2006f 100644 --- a/router/handle.go +++ b/router/handle.go @@ -18,7 +18,6 @@ import ( "github.com/tidwall/gjson" "golang.org/x/sync/errgroup" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" kitsync "github.com/rudderlabs/rudder-go-kit/sync" @@ -152,16 +151,11 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke var firstJob *jobsdb.JobT var lastJob *jobsdb.JobT - orderGroupKeyFn := func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState } - if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition and option after successful rollout of sinle query - orderGroupKeyFn = func(job *jobsdb.JobT) string { return "same" } - } iterator := jobiterator.New( rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize.Load()), rt.getJobsFn(ctx), jobiterator.WithDiscardedPercentageTolerance(rt.reloadableConfig.jobIteratorDiscardedPercentageTolerance.Load()), jobiterator.WithMaxQueries(rt.reloadableConfig.jobIteratorMaxQueries.Load()), - jobiterator.WithOrderGroupKeyFn(orderGroupKeyFn), ) if !iterator.HasNext() { diff --git a/router/internal/jobiterator/jobiterator.go b/router/internal/jobiterator/jobiterator.go index f1d68a69d8e..a213afca3bf 100644 --- a/router/internal/jobiterator/jobiterator.go +++ b/router/internal/jobiterator/jobiterator.go @@ -26,26 +26,18 @@ func WithDiscardedPercentageTolerance(discardedPercentageTolerance int) Iterator } } -// WithOrderGroupKeyFn sets the orderGroupKeyFn -func WithOrderGroupKeyFn(orderGroupKeyFn func(*jobsdb.JobT) string) IteratorOptFn { - return func(ji *Iterator) { - ji.orderGroupKeyFn = orderGroupKeyFn - } -} - // Iterator is a job iterator with support for fetching more than the original set of jobs requested, // in case some of these jobs get discarded, according to the configured discarded percentage tolerance. type Iterator struct { params jobsdb.GetQueryParams maxQueries int discardedPercentageTolerance int - orderGroupKeyFn func(*jobsdb.JobT) string getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error) state struct { // running iterator state jobs []*jobsdb.JobT idx int - previousJob map[string]*jobsdb.JobT + previousJob *jobsdb.JobT // closed indicates whether the iterator has reached the end or not closed bool @@ -81,10 +73,8 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge maxQueries: 100, discardedPercentageTolerance: 0, getJobsFn: getJobsFn, - orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState }, } ji.state.jobsLimit = params.JobsLimit - ji.state.previousJob = map[string]*jobsdb.JobT{} for _, opt := range opts { opt(ji) } @@ -158,11 +148,10 @@ func (ji *Iterator) Next() *jobsdb.JobT { idx := ji.state.idx ji.state.idx++ nextJob := ji.state.jobs[idx] - orderGroupKey := ji.orderGroupKeyFn(nextJob) - if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID { - panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID)) + if ji.state.previousJob != nil && ji.state.previousJob.JobID > nextJob.JobID { + panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", ji.state.previousJob.JobID, nextJob.JobID)) } - ji.state.previousJob[orderGroupKey] = nextJob + ji.state.previousJob = nextJob return nextJob }