Skip to content

Commit

Permalink
chore: cleanup JobsDB.useSingleGetJobsQuery config option
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 21, 2023
1 parent 3c3e407 commit 557998b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 182 deletions.
75 changes: 0 additions & 75 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,87 +1843,12 @@ func (jd *Handle) invalidateCacheForJobs(ds dataSetT, jobList []*JobT) {
}
}

type moreTokenLegacy struct {
retryAfterJobID *int64
waitingAfterJobID *int64
unprocessedAfterJobID *int64
}

type moreToken struct {
afterJobID *int64
}

// getToProcessLegacy returns jobs that are in failed, waiting and unprocessed states using three separate queries
//
// Deprecated: shall be removed after successful rollout
func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

mtoken := &moreTokenLegacy{}
if more != nil {
var ok bool
if mtoken, ok = more.(*moreTokenLegacy); !ok {
return nil, fmt.Errorf("invalid token: %+v", more)
}
}
updateParams := func(params *GetQueryParams, jobs JobsResult, nextAfterJobID *int64) {
params.JobsLimit -= len(jobs.Jobs)
if params.EventsLimit > 0 {
params.EventsLimit -= jobs.EventsCount
}
if params.PayloadSizeLimit > 0 {
params.PayloadSizeLimit -= jobs.PayloadSize
}
params.afterJobID = nextAfterJobID
}
var list []*JobT
params.afterJobID = mtoken.retryAfterJobID
toRetry, err := jd.GetFailed(ctx, params)
if err != nil {
return nil, err
}
if len(toRetry.Jobs) > 0 {
retryAfterJobID := toRetry.Jobs[len(toRetry.Jobs)-1].JobID
mtoken.retryAfterJobID = &retryAfterJobID
}

list = append(list, toRetry.Jobs...)
if toRetry.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, toRetry, mtoken.waitingAfterJobID)

waiting, err := jd.GetWaiting(ctx, params)
if err != nil {
return nil, err
}
if len(waiting.Jobs) > 0 {
waitingAfterJobID := waiting.Jobs[len(waiting.Jobs)-1].JobID
mtoken.waitingAfterJobID = &waitingAfterJobID
}
list = append(list, waiting.Jobs...)
if waiting.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, waiting, mtoken.unprocessedAfterJobID)

unprocessed, err := jd.GetUnprocessed(ctx, params)
if err != nil {
return nil, err
}
if len(unprocessed.Jobs) > 0 {
unprocessedAfterJobID := unprocessed.Jobs[len(unprocessed.Jobs)-1].JobID
mtoken.unprocessedAfterJobID = &unprocessedAfterJobID
}
list = append(list, unprocessed.Jobs...)
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
return jd.getToProcessLegacy(ctx, params, more)
}

if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil
}
Expand Down
65 changes: 14 additions & 51 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,57 +312,21 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
PayloadSizeLimit: st.adaptiveLimit(st.config.payloadLimit.Load()),
}

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr

Check warning on line 321 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L319-L321

Added lines #L319 - L321 were not covered by tests
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 324 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -410,10 +374,9 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
}
statusList = append(statusList, &status)
}
err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
if err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {

Check warning on line 377 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L377

Added line #L377 was not covered by tests
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, st.sendRetryUpdateStats)
if err != nil {
}, st.sendRetryUpdateStats); err != nil {

Check warning on line 379 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L379

Added line #L379 was not covered by tests
if ctx.Err() != nil { // we are shutting down
return //nolint:nilerr
}
Expand Down
43 changes: 8 additions & 35 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,15 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toRetry.Jobs
limitsReached = toRetry.LimitsReached
if !limitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = append(jobs, unprocessed.Jobs...)
limitsReached = unprocessed.LimitsReached
}
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)

Check warning on line 201 in router/batchrouter/handle.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached

brtQueryStat.Since(queryStart)
sort.Slice(jobs, func(i, j int) bool {
Expand Down
6 changes: 0 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
Expand Down Expand Up @@ -152,16 +151,11 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
var firstJob *jobsdb.JobT
var lastJob *jobsdb.JobT

orderGroupKeyFn := func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState }
if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition and option after successful rollout of sinle query
orderGroupKeyFn = func(job *jobsdb.JobT) string { return "same" }
}
iterator := jobiterator.New(
rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize.Load()),
rt.getJobsFn(ctx),
jobiterator.WithDiscardedPercentageTolerance(rt.reloadableConfig.jobIteratorDiscardedPercentageTolerance.Load()),
jobiterator.WithMaxQueries(rt.reloadableConfig.jobIteratorMaxQueries.Load()),
jobiterator.WithOrderGroupKeyFn(orderGroupKeyFn),
)

if !iterator.HasNext() {
Expand Down
19 changes: 4 additions & 15 deletions router/internal/jobiterator/jobiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,18 @@ func WithDiscardedPercentageTolerance(discardedPercentageTolerance int) Iterator
}
}

// WithOrderGroupKeyFn sets the orderGroupKeyFn
func WithOrderGroupKeyFn(orderGroupKeyFn func(*jobsdb.JobT) string) IteratorOptFn {
return func(ji *Iterator) {
ji.orderGroupKeyFn = orderGroupKeyFn
}
}

// Iterator is a job iterator with support for fetching more than the original set of jobs requested,
// in case some of these jobs get discarded, according to the configured discarded percentage tolerance.
type Iterator struct {
params jobsdb.GetQueryParams
maxQueries int
discardedPercentageTolerance int
orderGroupKeyFn func(*jobsdb.JobT) string
getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error)
state struct {
// running iterator state
jobs []*jobsdb.JobT
idx int
previousJob map[string]*jobsdb.JobT
previousJob *jobsdb.JobT

// closed indicates whether the iterator has reached the end or not
closed bool
Expand Down Expand Up @@ -81,10 +73,8 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge
maxQueries: 100,
discardedPercentageTolerance: 0,
getJobsFn: getJobsFn,
orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState },
}
ji.state.jobsLimit = params.JobsLimit
ji.state.previousJob = map[string]*jobsdb.JobT{}
for _, opt := range opts {
opt(ji)
}
Expand Down Expand Up @@ -158,11 +148,10 @@ func (ji *Iterator) Next() *jobsdb.JobT {
idx := ji.state.idx
ji.state.idx++
nextJob := ji.state.jobs[idx]
orderGroupKey := ji.orderGroupKeyFn(nextJob)
if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID))
if ji.state.previousJob != nil && ji.state.previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", ji.state.previousJob.JobID, nextJob.JobID))
}
ji.state.previousJob[orderGroupKey] = nextJob
ji.state.previousJob = nextJob
return nextJob
}

Expand Down

0 comments on commit 557998b

Please sign in to comment.