diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 69b47ef99d2..76d5336db9a 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1828,7 +1828,10 @@ type moreToken struct { afterJobID *int64 } -func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003 +// getToProcessLegacy returns jobs that are in failed, waiting and unprocessed states using three separate queries +// +// Deprecated: shall be removed after successful rollout +func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003 mtoken := &moreTokenLegacy{} if more != nil { @@ -1890,6 +1893,30 @@ func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil } +func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003 + + if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query + return jd.getToProcessLegacy(ctx, params, more) + } + + if params.JobsLimit == 0 { + return &MoreJobsResult{More: more}, nil + } + params.stateFilters = []string{Failed.State, Waiting.State, Unprocessed.State} + slices.Sort(params.stateFilters) + tags := statTags{ + StateFilters: params.stateFilters, + CustomValFilters: params.CustomValFilters, + ParameterFilters: params.ParameterFilters, + WorkspaceID: params.WorkspaceID, + } + command := func() moreQueryResult { + return moreQueryResultWrapper(jd.getJobs(ctx, params, more)) + } + res := executeDbRequest(jd, newReadDbRequest("get_jobs", &tags, command)) + return res.MoreJobsResult, res.err +} + var cacheParameterFilters = []string{"source_id", "destination_id"} func (jd *Handle) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error) { @@ -2122,7 +2149,7 @@ stateFilters and customValFilters do a OR query on values passed in array parameterFilters do a AND query on values included in the map. A JobsLimit less than or equal to zero indicates no limit. */ -func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003 +func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003 stateFilters := params.stateFilters customValFilters := params.CustomValFilters parameterFilters := params.ParameterFilters @@ -2135,31 +2162,42 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar } tags := statTags{ - StateFilters: params.stateFilters, + StateFilters: stateFilters, CustomValFilters: params.CustomValFilters, ParameterFilters: params.ParameterFilters, WorkspaceID: workspaceID, } + stateFilters = lo.Filter(stateFilters, func(state string, _ int) bool { // exclude states for which we already know that there are no jobs + return !jd.noResultsCache.Get(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters) + }) + defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()() - containsUnprocessed := lo.Contains(params.stateFilters, Unprocessed.State) + containsUnprocessed := lo.Contains(stateFilters, Unprocessed.State) skipCacheResult := params.afterJobID != nil - var cacheTx *cache.NoResultTx[ParameterFilterT] + cacheTx := map[string]*cache.NoResultTx[ParameterFilterT]{} if !skipCacheResult { - cacheTx = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, stateFilters, parameterFilters) + for _, state := range stateFilters { + // avoid setting result as noJobs if + // (1) state is unprocessed and + // (2) jobsdb owner is a reader and + // (3) ds is the right most one + if state == Unprocessed.State && jd.ownerType == Read && lastDS { + continue + } + cacheTx[state] = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters) + } } var filterConditions []string - if len(stateFilters) > 0 { - additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) { - return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State - }) - stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool { - return s != Unprocessed.State - }), additionalPredicates...) - filterConditions = append(filterConditions, stateQuery) - } + additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) { + return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State + }) + stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool { + return s != Unprocessed.State + }), additionalPredicates...) + filterConditions = append(filterConditions, stateQuery) if params.afterJobID != nil { filterConditions = append(filterConditions, fmt.Sprintf("jobs.job_id > %d", *params.afterJobID)) @@ -2254,7 +2292,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar var limitsReached bool var eventCount int var payloadSize int64 - + resultsetStates := map[string]struct{}{} for rows.Next() { var job JobT var jsState sql.NullString @@ -2273,6 +2311,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar return JobsResult{}, false, err } if jsState.Valid { + resultsetStates[jsState.String] = struct{}{} job.LastJobStatus.JobState = jsState.String job.LastJobStatus.AttemptNum = int(jsAttemptNum.Int64) job.LastJobStatus.ExecTime = jsExecTime.Time @@ -2280,6 +2319,8 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar job.LastJobStatus.ErrorCode = jsErrorCode.String job.LastJobStatus.ErrorResponse = jsErrorResponse job.LastJobStatus.Parameters = jsParameters + } else { + resultsetStates[Unprocessed.State] = struct{}{} } job.LastJobStatus.JobParameters = job.Parameters @@ -2310,12 +2351,13 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar } if !skipCacheResult { - dsList := jd.getDSList() - - // if query contains unprocessed and jobsdb owner is a reader and if ds is the right most one, ignoring setting result as noJobs - skipCacheCommit := containsUnprocessed && jd.ownerType == Read && ds.Index == dsList[len(dsList)-1].Index - if len(jobList) == 0 && !skipCacheCommit { - cacheTx.Commit() + for state, cacheTx := range cacheTx { + // we are committing the cache Tx only if + // (a) no jobs are returned by the query or + // (b) the state is not present in the resultset and limits have not been reached + if _, ok := resultsetStates[state]; len(jobList) == 0 || (!ok && !limitsReached) { + cacheTx.Commit() + } } } @@ -3205,7 +3247,7 @@ func (jd *Handle) getJobs(ctx context.Context, params GetQueryParams, more MoreT if dsLimit > 0 && dsQueryCount >= dsLimit { break } - jobs, dsHit, err := jd.getJobsDS(ctx, ds, params) + jobs, dsHit, err := jd.getJobsDS(ctx, ds, len(dsList)-1 == idx, params) if err != nil { return nil, err } @@ -3287,6 +3329,18 @@ func queryResultWrapper(res *MoreJobsResult, err error) queryResult { } } +type moreQueryResult struct { + *MoreJobsResult + err error +} + +func moreQueryResultWrapper(res *MoreJobsResult, err error) moreQueryResult { + return moreQueryResult{ + MoreJobsResult: res, + err: err, + } +} + func (jd *Handle) getMaxIDForDs(ds dataSetT) int64 { var maxID sql.NullInt64 sqlStatement := fmt.Sprintf(`SELECT MAX(job_id) FROM %s`, ds.JobTable) diff --git a/processor/processor_isolation_test.go b/processor/processor_isolation_test.go index 33f79eb6d43..c3ba1fb8ce0 100644 --- a/processor/processor_isolation_test.go +++ b/processor/processor_isolation_test.go @@ -227,6 +227,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa config.Set("JobsDB.backup.enabled", false) config.Set("JobsDB.migrateDSLoopSleepDuration", "60m") config.Set("Router.toAbortDestinationIDs", destinationID) + config.Set("archival.Enabled", false) config.Set("Processor.isolationMode", string(spec.isolationMode)) diff --git a/processor/processor_test.go b/processor/processor_test.go index 28285d064f8..1ff7e42c6cc 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2070,8 +2070,7 @@ var _ = Describe("Processor", Ordered, func() { c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1) - c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).AnyTimes() - c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes() c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() @@ -2126,8 +2125,7 @@ var _ = Describe("Processor", Ordered, func() { processor.config.readLoopSleep = time.Millisecond c.mockReadProcErrorsDB.EXPECT().FailExecuting() - c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() - c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes() + c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes() c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1) c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes() diff --git a/processor/stash/stash.go b/processor/stash/stash.go index 580c1c6c0d9..7d65298885b 100644 --- a/processor/stash/stash.go +++ b/processor/stash/stash.go @@ -309,6 +309,7 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) { return case <-time.After(sleepTime): start := time.Now() + var combinedList []*jobsdb.JobT var limitReached bool // NOTE: sending custom val filters array of size 1 to take advantage of cache in jobsdb. queryParams := jobsdb.GetQueryParams{ @@ -317,39 +318,58 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) { JobsLimit: errDBReadBatchSize, PayloadSizeLimit: st.adaptiveLimit(payloadLimit), } - toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { - return st.errorDB.GetFailed(ctx, queryParams) - }, sendQueryRetryStats) - if err != nil { - if ctx.Err() != nil { // we are shutting down - close(st.errProcessQ) - return //nolint:nilerr - } - st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) - panic(err) - } - combinedList := toRetry.Jobs - limitReached = toRetry.LimitsReached - if !toRetry.LimitsReached { - queryParams.JobsLimit -= len(toRetry.Jobs) - if queryParams.PayloadSizeLimit > 0 { - queryParams.PayloadSizeLimit -= toRetry.PayloadSize + if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query + toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) + }, sendQueryRetryStats) + if err != nil { + if ctx.Err() != nil { // we are shutting down + close(st.errProcessQ) + return //nolint:nilerr + } + st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) + panic(err) } - unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { - return st.errorDB.GetUnprocessed(ctx, queryParams) + + combinedList = toProcess.Jobs + limitReached = toProcess.LimitsReached + } else { + toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return st.errorDB.GetFailed(ctx, queryParams) }, sendQueryRetryStats) if err != nil { if ctx.Err() != nil { // we are shutting down close(st.errProcessQ) - return + return //nolint:nilerr } st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) panic(err) } - combinedList = append(combinedList, unprocessed.Jobs...) - limitReached = unprocessed.LimitsReached + + combinedList = toRetry.Jobs + limitReached = toRetry.LimitsReached + if !toRetry.LimitsReached { + queryParams.JobsLimit -= len(toRetry.Jobs) + if queryParams.PayloadSizeLimit > 0 { + queryParams.PayloadSizeLimit -= toRetry.PayloadSize + } + unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return st.errorDB.GetUnprocessed(ctx, queryParams) + }, sendQueryRetryStats) + if err != nil { + if ctx.Err() != nil { // we are shutting down + close(st.errProcessQ) + return + } + st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err) + panic(err) + } + combinedList = append(combinedList, unprocessed.Jobs...) + limitReached = unprocessed.LimitsReached + } } + st.statErrDBR.Since(start) if len(combinedList) == 0 { @@ -397,7 +417,7 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) { } statusList = append(statusList, &status) } - err = misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error { + err := misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error { return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil) }, sendRetryUpdateStats) if err != nil { diff --git a/router/batchrouter/batchrouter_test.go b/router/batchrouter/batchrouter_test.go index 68d7f3367f6..f4fd0bd0a99 100644 --- a/router/batchrouter/batchrouter_test.go +++ b/router/batchrouter/batchrouter_test.go @@ -235,29 +235,16 @@ var _ = Describe("BatchRouter", func() { } payloadLimit := batchrouter.payloadLimit - var toRetryJobsListCalled bool - var unprocessedJobsListCalled bool - c.mockBatchRouterJobsDB.EXPECT().GetFailed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) { - if !toRetryJobsListCalled { - toRetryJobsListCalled = true - return jobsdb.JobsResult{Jobs: toRetryJobsList}, nil + var getJobsListCalled bool + c.mockBatchRouterJobsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, states []string, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) { + var res jobsdb.JobsResult + if !getJobsListCalled { + getJobsListCalled = true + jobs := append([]*jobsdb.JobT{}, toRetryJobsList...) + jobs = append(jobs, unprocessedJobsList...) + res.Jobs = jobs } - return jobsdb.JobsResult{}, nil - }).AnyTimes() - c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize - len(toRetryJobsList), PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) { - if !unprocessedJobsListCalled { - unprocessedJobsListCalled = true - return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil - } - return jobsdb.JobsResult{}, nil - }).Times(1) - - c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) { - if !unprocessedJobsListCalled { - unprocessedJobsListCalled = true - return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil - } - return jobsdb.JobsResult{}, nil + return res, nil }).AnyTimes() c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1). diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index bfd0095ba61..a6e44634063 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -191,30 +191,44 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob } brt.isolationStrategy.AugmentQueryParams(partition, &queryParams) var limitsReached bool - toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetFailed(ctx, queryParams) - }, brt.sendQueryRetryStats) - if err != nil { - brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) - panic(err) - } - jobs = toRetry.Jobs - limitsReached = toRetry.LimitsReached - if !limitsReached { - queryParams.JobsLimit -= len(toRetry.Jobs) - if queryParams.PayloadSizeLimit > 0 { - queryParams.PayloadSizeLimit -= toRetry.PayloadSize + + if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query + toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) + }, brt.sendQueryRetryStats) + if err != nil { + brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) + panic(err) } - unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetUnprocessed(ctx, queryParams) + jobs = toProcess.Jobs + limitsReached = toProcess.LimitsReached + } else { + toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return brt.jobsDB.GetFailed(ctx, queryParams) }, brt.sendQueryRetryStats) if err != nil { brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) panic(err) } - jobs = append(jobs, unprocessed.Jobs...) - limitsReached = unprocessed.LimitsReached + jobs = toRetry.Jobs + limitsReached = toRetry.LimitsReached + if !limitsReached { + queryParams.JobsLimit -= len(toRetry.Jobs) + if queryParams.PayloadSizeLimit > 0 { + queryParams.PayloadSizeLimit -= toRetry.PayloadSize + } + unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) { + return brt.jobsDB.GetUnprocessed(ctx, queryParams) + }, brt.sendQueryRetryStats) + if err != nil { + brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err) + panic(err) + } + jobs = append(jobs, unprocessed.Jobs...) + limitsReached = unprocessed.LimitsReached + } } + brtQueryStat.Since(queryStart) sort.Slice(jobs, func(i, j int) bool { return jobs[i].JobID < jobs[j].JobID diff --git a/router/handle.go b/router/handle.go index 1f73c7ef288..fc37d3531d5 100644 --- a/router/handle.go +++ b/router/handle.go @@ -18,6 +18,7 @@ import ( "github.com/tidwall/gjson" "golang.org/x/sync/errgroup" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" kitsync "github.com/rudderlabs/rudder-go-kit/sync" @@ -151,11 +152,16 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke var firstJob *jobsdb.JobT var lastJob *jobsdb.JobT + orderGroupKeyFn := func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState } + if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition and option after successful rollout of sinle query + orderGroupKeyFn = func(job *jobsdb.JobT) string { return "same" } + } iterator := jobiterator.New( rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize), rt.getJobsFn(ctx), jobiterator.WithDiscardedPercentageTolerance(rt.reloadableConfig.jobIteratorDiscardedPercentageTolerance), jobiterator.WithMaxQueries(rt.reloadableConfig.jobIteratorMaxQueries), + jobiterator.WithOrderGroupKeyFn(orderGroupKeyFn), ) if !iterator.HasNext() { diff --git a/router/internal/jobiterator/jobiterator.go b/router/internal/jobiterator/jobiterator.go index 72593180f0b..f1d68a69d8e 100644 --- a/router/internal/jobiterator/jobiterator.go +++ b/router/internal/jobiterator/jobiterator.go @@ -26,6 +26,13 @@ func WithDiscardedPercentageTolerance(discardedPercentageTolerance int) Iterator } } +// WithOrderGroupKeyFn sets the orderGroupKeyFn +func WithOrderGroupKeyFn(orderGroupKeyFn func(*jobsdb.JobT) string) IteratorOptFn { + return func(ji *Iterator) { + ji.orderGroupKeyFn = orderGroupKeyFn + } +} + // Iterator is a job iterator with support for fetching more than the original set of jobs requested, // in case some of these jobs get discarded, according to the configured discarded percentage tolerance. type Iterator struct {