Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup JobsDB.useSingleGetJobsQuery config option #3893

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 0 additions & 75 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,87 +1843,12 @@ func (jd *Handle) invalidateCacheForJobs(ds dataSetT, jobList []*JobT) {
}
}

type moreTokenLegacy struct {
retryAfterJobID *int64
waitingAfterJobID *int64
unprocessedAfterJobID *int64
}

type moreToken struct {
afterJobID *int64
}

// getToProcessLegacy returns jobs that are in failed, waiting and unprocessed states using three separate queries
//
// Deprecated: shall be removed after successful rollout
func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

mtoken := &moreTokenLegacy{}
if more != nil {
var ok bool
if mtoken, ok = more.(*moreTokenLegacy); !ok {
return nil, fmt.Errorf("invalid token: %+v", more)
}
}
updateParams := func(params *GetQueryParams, jobs JobsResult, nextAfterJobID *int64) {
params.JobsLimit -= len(jobs.Jobs)
if params.EventsLimit > 0 {
params.EventsLimit -= jobs.EventsCount
}
if params.PayloadSizeLimit > 0 {
params.PayloadSizeLimit -= jobs.PayloadSize
}
params.afterJobID = nextAfterJobID
}
var list []*JobT
params.afterJobID = mtoken.retryAfterJobID
toRetry, err := jd.GetFailed(ctx, params)
if err != nil {
return nil, err
}
if len(toRetry.Jobs) > 0 {
retryAfterJobID := toRetry.Jobs[len(toRetry.Jobs)-1].JobID
mtoken.retryAfterJobID = &retryAfterJobID
}

list = append(list, toRetry.Jobs...)
if toRetry.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, toRetry, mtoken.waitingAfterJobID)

waiting, err := jd.GetWaiting(ctx, params)
if err != nil {
return nil, err
}
if len(waiting.Jobs) > 0 {
waitingAfterJobID := waiting.Jobs[len(waiting.Jobs)-1].JobID
mtoken.waitingAfterJobID = &waitingAfterJobID
}
list = append(list, waiting.Jobs...)
if waiting.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, waiting, mtoken.unprocessedAfterJobID)

unprocessed, err := jd.GetUnprocessed(ctx, params)
if err != nil {
return nil, err
}
if len(unprocessed.Jobs) > 0 {
unprocessedAfterJobID := unprocessed.Jobs[len(unprocessed.Jobs)-1].JobID
mtoken.unprocessedAfterJobID = &unprocessedAfterJobID
}
list = append(list, unprocessed.Jobs...)
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
return jd.getToProcessLegacy(ctx, params, more)
}

if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil
}
Expand Down
65 changes: 14 additions & 51 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,57 +312,21 @@
PayloadSizeLimit: st.adaptiveLimit(st.config.payloadLimit.Load()),
}

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList = toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.config.jobdDBQueryRequestTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, st.sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr

Check warning on line 321 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L319-L321

Added lines #L319 - L321 were not covered by tests
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 324 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -410,10 +374,9 @@
}
statusList = append(statusList, &status)
}
err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
if err := misc.RetryWithNotify(context.Background(), st.config.jobsDBCommandTimeout.Load(), st.config.jobdDBMaxRetries.Load(), func(ctx context.Context) error {

Check warning on line 377 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L377

Added line #L377 was not covered by tests
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, st.sendRetryUpdateStats)
if err != nil {
}, st.sendRetryUpdateStats); err != nil {

Check warning on line 379 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L379

Added line #L379 was not covered by tests
if ctx.Err() != nil { // we are shutting down
return //nolint:nilerr
}
Expand Down
43 changes: 8 additions & 35 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,15 @@
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool

if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toRetry.Jobs
limitsReached = toRetry.LimitsReached
if !limitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = append(jobs, unprocessed.Jobs...)
limitsReached = unprocessed.LimitsReached
}
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)

Check warning on line 201 in router/batchrouter/handle.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached

brtQueryStat.Since(queryStart)
sort.Slice(jobs, func(i, j int) bool {
Expand Down
6 changes: 0 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
Expand Down Expand Up @@ -152,16 +151,11 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
var firstJob *jobsdb.JobT
var lastJob *jobsdb.JobT

orderGroupKeyFn := func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState }
if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition and option after successful rollout of sinle query
orderGroupKeyFn = func(job *jobsdb.JobT) string { return "same" }
}
iterator := jobiterator.New(
rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize.Load()),
rt.getJobsFn(ctx),
jobiterator.WithDiscardedPercentageTolerance(rt.reloadableConfig.jobIteratorDiscardedPercentageTolerance.Load()),
jobiterator.WithMaxQueries(rt.reloadableConfig.jobIteratorMaxQueries.Load()),
jobiterator.WithOrderGroupKeyFn(orderGroupKeyFn),
)

if !iterator.HasNext() {
Expand Down
19 changes: 4 additions & 15 deletions router/internal/jobiterator/jobiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,18 @@ func WithDiscardedPercentageTolerance(discardedPercentageTolerance int) Iterator
}
}

// WithOrderGroupKeyFn sets the orderGroupKeyFn
func WithOrderGroupKeyFn(orderGroupKeyFn func(*jobsdb.JobT) string) IteratorOptFn {
return func(ji *Iterator) {
ji.orderGroupKeyFn = orderGroupKeyFn
}
}

// Iterator is a job iterator with support for fetching more than the original set of jobs requested,
// in case some of these jobs get discarded, according to the configured discarded percentage tolerance.
type Iterator struct {
params jobsdb.GetQueryParams
maxQueries int
discardedPercentageTolerance int
orderGroupKeyFn func(*jobsdb.JobT) string
getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error)
state struct {
// running iterator state
jobs []*jobsdb.JobT
idx int
previousJob map[string]*jobsdb.JobT
previousJob *jobsdb.JobT

// closed indicates whether the iterator has reached the end or not
closed bool
Expand Down Expand Up @@ -81,10 +73,8 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge
maxQueries: 100,
discardedPercentageTolerance: 0,
getJobsFn: getJobsFn,
orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState },
}
ji.state.jobsLimit = params.JobsLimit
ji.state.previousJob = map[string]*jobsdb.JobT{}
for _, opt := range opts {
opt(ji)
}
Expand Down Expand Up @@ -158,11 +148,10 @@ func (ji *Iterator) Next() *jobsdb.JobT {
idx := ji.state.idx
ji.state.idx++
nextJob := ji.state.jobs[idx]
orderGroupKey := ji.orderGroupKeyFn(nextJob)
if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID))
if ji.state.previousJob != nil && ji.state.previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", ji.state.previousJob.JobID, nextJob.JobID))
}
ji.state.previousJob[orderGroupKey] = nextJob
ji.state.previousJob = nextJob
return nextJob
}

Expand Down
Loading