Skip to content

Commit

Permalink
chore: cleanup JobsDB.useSingleGetJobsQuery config option
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 20, 2023
1 parent 1fa2f45 commit 9cf9c91
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 111 deletions.
4 changes: 0 additions & 4 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,10 +1920,6 @@ func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams,

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
return jd.getToProcessLegacy(ctx, params, more)
}

if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil
}
Expand Down
65 changes: 14 additions & 51 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,57 +312,21 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
PayloadSizeLimit: st.adaptiveLimit(st.config.payloadLimit.Load()),
}

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -410,10 +374,9 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
}
statusList = append(statusList, &status)
}
err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
if err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, st.sendRetryUpdateStats)
if err != nil {
}, st.sendRetryUpdateStats); err != nil {
if ctx.Err() != nil { // we are shutting down
return //nolint:nilerr
}
Expand Down
43 changes: 8 additions & 35 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,15 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toRetry.Jobs
limitsReached = toRetry.LimitsReached
if !limitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = append(jobs, unprocessed.Jobs...)
limitsReached = unprocessed.LimitsReached
}
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached

brtQueryStat.Since(queryStart)
sort.Slice(jobs, func(i, j int) bool {
Expand Down
6 changes: 0 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
Expand Down Expand Up @@ -152,16 +151,11 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
var firstJob *jobsdb.JobT
var lastJob *jobsdb.JobT

orderGroupKeyFn := func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState }
if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition and option after successful rollout of sinle query
orderGroupKeyFn = func(job *jobsdb.JobT) string { return "same" }
}
iterator := jobiterator.New(
rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize.Load()),
rt.getJobsFn(ctx),
jobiterator.WithDiscardedPercentageTolerance(rt.reloadableConfig.jobIteratorDiscardedPercentageTolerance.Load()),
jobiterator.WithMaxQueries(rt.reloadableConfig.jobIteratorMaxQueries.Load()),
jobiterator.WithOrderGroupKeyFn(orderGroupKeyFn),
)

if !iterator.HasNext() {
Expand Down
19 changes: 4 additions & 15 deletions router/internal/jobiterator/jobiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,18 @@ func WithDiscardedPercentageTolerance(discardedPercentageTolerance int) Iterator
}
}

// WithOrderGroupKeyFn sets the orderGroupKeyFn
func WithOrderGroupKeyFn(orderGroupKeyFn func(*jobsdb.JobT) string) IteratorOptFn {
return func(ji *Iterator) {
ji.orderGroupKeyFn = orderGroupKeyFn
}
}

// Iterator is a job iterator with support for fetching more than the original set of jobs requested,
// in case some of these jobs get discarded, according to the configured discarded percentage tolerance.
type Iterator struct {
params jobsdb.GetQueryParams
maxQueries int
discardedPercentageTolerance int
orderGroupKeyFn func(*jobsdb.JobT) string
getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error)
state struct {
// running iterator state
jobs []*jobsdb.JobT
idx int
previousJob map[string]*jobsdb.JobT
previousJob *jobsdb.JobT

// closed indicates whether the iterator has reached the end or not
closed bool
Expand Down Expand Up @@ -81,10 +73,8 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge
maxQueries: 100,
discardedPercentageTolerance: 0,
getJobsFn: getJobsFn,
orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState },
}
ji.state.jobsLimit = params.JobsLimit
ji.state.previousJob = map[string]*jobsdb.JobT{}
for _, opt := range opts {
opt(ji)
}
Expand Down Expand Up @@ -158,11 +148,10 @@ func (ji *Iterator) Next() *jobsdb.JobT {
idx := ji.state.idx
ji.state.idx++
nextJob := ji.state.jobs[idx]
orderGroupKey := ji.orderGroupKeyFn(nextJob)
if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID))
if ji.state.previousJob != nil && ji.state.previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", ji.state.previousJob.JobID, nextJob.JobID))
}
ji.state.previousJob[orderGroupKey] = nextJob
ji.state.previousJob = nextJob
return nextJob
}

Expand Down

0 comments on commit 9cf9c91

Please sign in to comment.