Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use single query for getting jobs #3820

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 77 additions & 23 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,10 @@
afterJobID *int64
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003
// getToProcessLegacy returns jobs that are in failed, waiting and unprocessed states using three separate queries
//
// Deprecated: shall be removed after successful rollout
func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

Check warning on line 1834 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1834

Added line #L1834 was not covered by tests

mtoken := &moreTokenLegacy{}
if more != nil {
Expand Down Expand Up @@ -1890,6 +1893,30 @@
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
return jd.getToProcessLegacy(ctx, params, more)
}

Check warning on line 1900 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1899-L1900

Added lines #L1899 - L1900 were not covered by tests

if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil
}

Check warning on line 1904 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1903-L1904

Added lines #L1903 - L1904 were not covered by tests
params.stateFilters = []string{Failed.State, Waiting.State, Unprocessed.State}
slices.Sort(params.stateFilters)
fracasula marked this conversation as resolved.
Show resolved Hide resolved
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
}
command := func() moreQueryResult {
return moreQueryResultWrapper(jd.getJobs(ctx, params, more))
}
res := executeDbRequest(jd, newReadDbRequest("get_jobs", &tags, command))
return res.MoreJobsResult, res.err
}

var cacheParameterFilters = []string{"source_id", "destination_id"}

func (jd *Handle) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error) {
Expand Down Expand Up @@ -2122,7 +2149,7 @@
parameterFilters do a AND query on values included in the map.
A JobsLimit less than or equal to zero indicates no limit.
*/
func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003
func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003
stateFilters := params.stateFilters
customValFilters := params.CustomValFilters
parameterFilters := params.ParameterFilters
Expand All @@ -2135,31 +2162,42 @@
}

tags := statTags{
StateFilters: params.stateFilters,
StateFilters: stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: workspaceID,
}

stateFilters = lo.Filter(stateFilters, func(state string, _ int) bool { // exclude states for which we already know that there are no jobs
return !jd.noResultsCache.Get(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters)
})

defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()()

containsUnprocessed := lo.Contains(params.stateFilters, Unprocessed.State)
containsUnprocessed := lo.Contains(stateFilters, Unprocessed.State)
skipCacheResult := params.afterJobID != nil
var cacheTx *cache.NoResultTx[ParameterFilterT]
cacheTx := map[string]*cache.NoResultTx[ParameterFilterT]{}
if !skipCacheResult {
cacheTx = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, stateFilters, parameterFilters)
for _, state := range stateFilters {
// avoid setting result as noJobs if
// (1) state is unprocessed and
// (2) jobsdb owner is a reader and
// (3) ds is the right most one
if state == Unprocessed.State && jd.ownerType == Read && lastDS {
continue
}
cacheTx[state] = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters)
}
}

var filterConditions []string
if len(stateFilters) > 0 {
additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) {
return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State
})
stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool {
return s != Unprocessed.State
}), additionalPredicates...)
filterConditions = append(filterConditions, stateQuery)
}
additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) {
return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State
})
stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool {
return s != Unprocessed.State
}), additionalPredicates...)
filterConditions = append(filterConditions, stateQuery)

if params.afterJobID != nil {
filterConditions = append(filterConditions, fmt.Sprintf("jobs.job_id > %d", *params.afterJobID))
Expand Down Expand Up @@ -2254,7 +2292,7 @@
var limitsReached bool
var eventCount int
var payloadSize int64

resultsetStates := map[string]struct{}{}
for rows.Next() {
var job JobT
var jsState sql.NullString
Expand All @@ -2273,13 +2311,16 @@
return JobsResult{}, false, err
}
if jsState.Valid {
resultsetStates[jsState.String] = struct{}{}
job.LastJobStatus.JobState = jsState.String
job.LastJobStatus.AttemptNum = int(jsAttemptNum.Int64)
job.LastJobStatus.ExecTime = jsExecTime.Time
job.LastJobStatus.RetryTime = jsRetryTime.Time
job.LastJobStatus.ErrorCode = jsErrorCode.String
job.LastJobStatus.ErrorResponse = jsErrorResponse
job.LastJobStatus.Parameters = jsParameters
} else {
resultsetStates[Unprocessed.State] = struct{}{}
}
job.LastJobStatus.JobParameters = job.Parameters

Expand Down Expand Up @@ -2310,12 +2351,13 @@
}

if !skipCacheResult {
dsList := jd.getDSList()

// if query contains unprocessed and jobsdb owner is a reader and if ds is the right most one, ignoring setting result as noJobs
skipCacheCommit := containsUnprocessed && jd.ownerType == Read && ds.Index == dsList[len(dsList)-1].Index
if len(jobList) == 0 && !skipCacheCommit {
cacheTx.Commit()
for state, cacheTx := range cacheTx {
// we are committing the cache Tx only if
// (a) no jobs are returned by the query or
// (b) the state is not present in the resultset and limits have not been reached
if _, ok := resultsetStates[state]; len(jobList) == 0 || (!ok && !limitsReached) {
cacheTx.Commit()
}
}
}

Expand Down Expand Up @@ -3205,7 +3247,7 @@
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
jobs, dsHit, err := jd.getJobsDS(ctx, ds, params)
jobs, dsHit, err := jd.getJobsDS(ctx, ds, len(dsList)-1 == idx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3287,6 +3329,18 @@
}
}

type moreQueryResult struct {
*MoreJobsResult
err error
}

func moreQueryResultWrapper(res *MoreJobsResult, err error) moreQueryResult {
return moreQueryResult{
MoreJobsResult: res,
err: err,
}
}

func (jd *Handle) getMaxIDForDs(ds dataSetT) int64 {
var maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MAX(job_id) FROM %s`, ds.JobTable)
Expand Down
1 change: 1 addition & 0 deletions processor/processor_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa
config.Set("JobsDB.backup.enabled", false)
config.Set("JobsDB.migrateDSLoopSleepDuration", "60m")
config.Set("Router.toAbortDestinationIDs", destinationID)
config.Set("archival.Enabled", false)

config.Set("Processor.isolationMode", string(spec.isolationMode))

Expand Down
6 changes: 2 additions & 4 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2070,8 +2070,7 @@ var _ = Describe("Processor", Ordered, func() {

c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1)
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes()
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()

Expand Down Expand Up @@ -2126,8 +2125,7 @@ var _ = Describe("Processor", Ordered, func() {
processor.config.readLoopSleep = time.Millisecond

c.mockReadProcErrorsDB.EXPECT().FailExecuting()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes()
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
Expand Down
66 changes: 43 additions & 23 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
return
case <-time.After(sleepTime):
start := time.Now()
var combinedList []*jobsdb.JobT
var limitReached bool
// NOTE: sending custom val filters array of size 1 to take advantage of cache in jobsdb.
queryParams := jobsdb.GetQueryParams{
Expand All @@ -317,39 +318,58 @@
JobsLimit: errDBReadBatchSize,
PayloadSizeLimit: st.adaptiveLimit(payloadLimit),
}
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 332 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L327-L332

Added lines #L327 - L332 were not covered by tests
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)

Check warning on line 339 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L337-L339

Added lines #L337 - L339 were not covered by tests
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
return //nolint:nilerr

Check warning on line 344 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L344

Added line #L344 was not covered by tests
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached

combinedList = toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 366 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L350-L366

Added lines #L350 - L366 were not covered by tests
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached

Check warning on line 369 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
}

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -397,7 +417,7 @@
}
statusList = append(statusList, &status)
}
err = misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error {

Check warning on line 420 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L420

Added line #L420 was not covered by tests
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, sendRetryUpdateStats)
if err != nil {
Expand Down
31 changes: 9 additions & 22 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,29 +235,16 @@ var _ = Describe("BatchRouter", func() {
}

payloadLimit := batchrouter.payloadLimit
var toRetryJobsListCalled bool
var unprocessedJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetFailed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !toRetryJobsListCalled {
toRetryJobsListCalled = true
return jobsdb.JobsResult{Jobs: toRetryJobsList}, nil
var getJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, states []string, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
var res jobsdb.JobsResult
if !getJobsListCalled {
getJobsListCalled = true
jobs := append([]*jobsdb.JobT{}, toRetryJobsList...)
jobs = append(jobs, unprocessedJobsList...)
res.Jobs = jobs
}
return jobsdb.JobsResult{}, nil
}).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize - len(toRetryJobsList), PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).Times(1)

c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
return res, nil
}).AnyTimes()

c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Expand Down
Loading
Loading