Skip to content

Commit

Permalink
chore: boost performance wip
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 8, 2023
1 parent 3a34e06 commit 9e43bff
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 89 deletions.
100 changes: 77 additions & 23 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,10 @@ type moreToken struct {
afterJobID *int64
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003
// getToProcessLegacy returns jobs that are in failed, waiting and unprocessed states using three separate queries
//
// Deprecated: shall be removed after successful rollout
func (jd *Handle) getToProcessLegacy(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

Check warning on line 1834 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1834

Added line #L1834 was not covered by tests

mtoken := &moreTokenLegacy{}
if more != nil {
Expand Down Expand Up @@ -1890,6 +1893,30 @@ func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003

if !jd.config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
return jd.getToProcessLegacy(ctx, params, more)
}

Check warning on line 1900 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1899-L1900

Added lines #L1899 - L1900 were not covered by tests

if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil
}

Check warning on line 1904 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1903-L1904

Added lines #L1903 - L1904 were not covered by tests
params.stateFilters = []string{Failed.State, Waiting.State, Unprocessed.State}
slices.Sort(params.stateFilters)
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
}
command := func() moreQueryResult {
return moreQueryResultWrapper(jd.getJobs(ctx, params, more))
}
res := executeDbRequest(jd, newReadDbRequest("get_jobs", &tags, command))
return res.MoreJobsResult, res.err
}

var cacheParameterFilters = []string{"source_id", "destination_id"}

func (jd *Handle) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error) {
Expand Down Expand Up @@ -2122,7 +2149,7 @@ stateFilters and customValFilters do a OR query on values passed in array
parameterFilters do a AND query on values included in the map.
A JobsLimit less than or equal to zero indicates no limit.
*/
func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003
func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, params GetQueryParams) (JobsResult, bool, error) { // skipcq: CRT-P0003
stateFilters := params.stateFilters
customValFilters := params.CustomValFilters
parameterFilters := params.ParameterFilters
Expand All @@ -2135,31 +2162,42 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
}

tags := statTags{
StateFilters: params.stateFilters,
StateFilters: stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: workspaceID,
}

stateFilters = lo.Filter(stateFilters, func(state string, _ int) bool { // exclude states for which we already know that there are no jobs
return !jd.noResultsCache.Get(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters)
})

defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()()

containsUnprocessed := lo.Contains(params.stateFilters, Unprocessed.State)
containsUnprocessed := lo.Contains(stateFilters, Unprocessed.State)
skipCacheResult := params.afterJobID != nil
var cacheTx *cache.NoResultTx[ParameterFilterT]
cacheTx := map[string]*cache.NoResultTx[ParameterFilterT]{}
if !skipCacheResult {
cacheTx = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, stateFilters, parameterFilters)
for _, state := range stateFilters {
// avoid setting result as noJobs if
// (1) state is unprocessed and
// (2) jobsdb owner is a reader and
// (3) ds is the right most one
if state == Unprocessed.State && jd.ownerType == Read && lastDS {
continue
}
cacheTx[state] = jd.noResultsCache.StartNoResultTx(ds.Index, workspaceID, customValFilters, []string{state}, parameterFilters)
}
}

var filterConditions []string
if len(stateFilters) > 0 {
additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) {
return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State
})
stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool {
return s != Unprocessed.State
}), additionalPredicates...)
filterConditions = append(filterConditions, stateQuery)
}
additionalPredicates := lo.FilterMap(stateFilters, func(s string, _ int) (string, bool) {
return "(job_latest_state.job_id IS NULL)", s == Unprocessed.State
})
stateQuery := constructQueryOR("job_latest_state.job_state", lo.Filter(stateFilters, func(s string, _ int) bool {
return s != Unprocessed.State
}), additionalPredicates...)
filterConditions = append(filterConditions, stateQuery)

if params.afterJobID != nil {
filterConditions = append(filterConditions, fmt.Sprintf("jobs.job_id > %d", *params.afterJobID))
Expand Down Expand Up @@ -2254,7 +2292,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
var limitsReached bool
var eventCount int
var payloadSize int64

resultsetStates := map[string]struct{}{}
for rows.Next() {
var job JobT
var jsState sql.NullString
Expand All @@ -2273,13 +2311,16 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
return JobsResult{}, false, err
}
if jsState.Valid {
resultsetStates[jsState.String] = struct{}{}
job.LastJobStatus.JobState = jsState.String
job.LastJobStatus.AttemptNum = int(jsAttemptNum.Int64)
job.LastJobStatus.ExecTime = jsExecTime.Time
job.LastJobStatus.RetryTime = jsRetryTime.Time
job.LastJobStatus.ErrorCode = jsErrorCode.String
job.LastJobStatus.ErrorResponse = jsErrorResponse
job.LastJobStatus.Parameters = jsParameters
} else {
resultsetStates[Unprocessed.State] = struct{}{}
}
job.LastJobStatus.JobParameters = job.Parameters

Expand Down Expand Up @@ -2310,12 +2351,13 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
}

if !skipCacheResult {
dsList := jd.getDSList()

// if query contains unprocessed and jobsdb owner is a reader and if ds is the right most one, ignoring setting result as noJobs
skipCacheCommit := containsUnprocessed && jd.ownerType == Read && ds.Index == dsList[len(dsList)-1].Index
if len(jobList) == 0 && !skipCacheCommit {
cacheTx.Commit()
for state, cacheTx := range cacheTx {
// we are committing the cache Tx only if
// (a) no jobs are returned by the query or
// (b) the state is not present in the resultset and limits have not been reached
if _, ok := resultsetStates[state]; len(jobList) == 0 || (!ok && !limitsReached) {
cacheTx.Commit()
}
}
}

Expand Down Expand Up @@ -3205,7 +3247,7 @@ func (jd *Handle) getJobs(ctx context.Context, params GetQueryParams, more MoreT
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
jobs, dsHit, err := jd.getJobsDS(ctx, ds, params)
jobs, dsHit, err := jd.getJobsDS(ctx, ds, len(dsList)-1 == idx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3287,6 +3329,18 @@ func queryResultWrapper(res *MoreJobsResult, err error) queryResult {
}
}

type moreQueryResult struct {
*MoreJobsResult
err error
}

func moreQueryResultWrapper(res *MoreJobsResult, err error) moreQueryResult {
return moreQueryResult{
MoreJobsResult: res,
err: err,
}
}

func (jd *Handle) getMaxIDForDs(ds dataSetT) int64 {
var maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MAX(job_id) FROM %s`, ds.JobTable)
Expand Down
1 change: 1 addition & 0 deletions processor/processor_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa
config.Set("JobsDB.backup.enabled", false)
config.Set("JobsDB.migrateDSLoopSleepDuration", "60m")
config.Set("Router.toAbortDestinationIDs", destinationID)
config.Set("archival.Enabled", false)

config.Set("Processor.isolationMode", string(spec.isolationMode))

Expand Down
6 changes: 2 additions & 4 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2070,8 +2070,7 @@ var _ = Describe("Processor", Ordered, func() {

c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1)
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes()
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()

Expand Down Expand Up @@ -2126,8 +2125,7 @@ var _ = Describe("Processor", Ordered, func() {
processor.config.readLoopSleep = time.Millisecond

c.mockReadProcErrorsDB.EXPECT().FailExecuting()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, gomock.Any()).AnyTimes()
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
Expand Down
66 changes: 43 additions & 23 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
return
case <-time.After(sleepTime):
start := time.Now()
var combinedList []*jobsdb.JobT
var limitReached bool
// NOTE: sending custom val filters array of size 1 to take advantage of cache in jobsdb.
queryParams := jobsdb.GetQueryParams{
Expand All @@ -317,39 +318,58 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
JobsLimit: errDBReadBatchSize,
PayloadSizeLimit: st.adaptiveLimit(payloadLimit),
}
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}

combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
if config.GetBool("JobsDB.useSingleGetJobsQuery", true) { // TODO: remove condition after successful rollout of sinle query
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return //nolint:nilerr
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 332 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L327-L332

Added lines #L327 - L332 were not covered by tests
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)

combinedList = toProcess.Jobs
limitReached = toProcess.LimitsReached
} else {
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)

Check warning on line 339 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L337-L339

Added lines #L337 - L339 were not covered by tests
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
return //nolint:nilerr

Check warning on line 344 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L344

Added line #L344 was not covered by tests
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached

combinedList = toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 366 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L350-L366

Added lines #L350 - L366 were not covered by tests
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached

Check warning on line 369 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
}

st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down Expand Up @@ -397,7 +417,7 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
}
statusList = append(statusList, &status)
}
err = misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), st.jobsDBCommandTimeout, st.jobdDBMaxRetries, func(ctx context.Context) error {

Check warning on line 420 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L420

Added line #L420 was not covered by tests
return st.errorDB.UpdateJobStatus(ctx, statusList, nil, nil)
}, sendRetryUpdateStats)
if err != nil {
Expand Down
31 changes: 9 additions & 22 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,29 +235,16 @@ var _ = Describe("BatchRouter", func() {
}

payloadLimit := batchrouter.payloadLimit
var toRetryJobsListCalled bool
var unprocessedJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetFailed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !toRetryJobsListCalled {
toRetryJobsListCalled = true
return jobsdb.JobsResult{Jobs: toRetryJobsList}, nil
var getJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetJobs(gomock.Any(), []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, states []string, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
var res jobsdb.JobsResult
if !getJobsListCalled {
getJobsListCalled = true
jobs := append([]*jobsdb.JobT{}, toRetryJobsList...)
jobs = append(jobs, unprocessedJobsList...)
res.Jobs = jobs
}
return jobsdb.JobsResult{}, nil
}).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize - len(toRetryJobsList), PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).Times(1)

c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
return res, nil
}).AnyTimes()

c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Expand Down
Loading

0 comments on commit 9e43bff

Please sign in to comment.