Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: increased postgres cpu usage after switching to a single get jobs query #3812

Merged
merged 2 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 79 additions & 32 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1818,27 +1818,76 @@
}
}

type moreTokenLegacy struct {
retryAfterJobID *int64
waitingAfterJobID *int64
unprocessedAfterJobID *int64
}

type moreToken struct {
afterJobID *int64
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003
if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil

mtoken := &moreTokenLegacy{}
if more != nil {
var ok bool
if mtoken, ok = more.(*moreTokenLegacy); !ok {
return nil, fmt.Errorf("invalid token: %+v", more)
}

Check warning on line 1838 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1837-L1838

Added lines #L1837 - L1838 were not covered by tests
}
params.stateFilters = []string{Failed.State, Waiting.State, Unprocessed.State}
slices.Sort(params.stateFilters)
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
updateParams := func(params *GetQueryParams, jobs JobsResult, nextAfterJobID *int64) {
params.JobsLimit -= len(jobs.Jobs)
if params.EventsLimit > 0 {
params.EventsLimit -= jobs.EventsCount
}

Check warning on line 1844 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1843-L1844

Added lines #L1843 - L1844 were not covered by tests
if params.PayloadSizeLimit > 0 {
params.PayloadSizeLimit -= jobs.PayloadSize
}
params.afterJobID = nextAfterJobID
}
command := func() moreQueryResult {
return moreQueryResultWrapper(jd.getJobs(ctx, params, more))
var list []*JobT
params.afterJobID = mtoken.retryAfterJobID
toRetry, err := jd.GetFailed(ctx, params)
if err != nil {
return nil, err

Check warning on line 1854 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1854

Added line #L1854 was not covered by tests
}
res := executeDbRequest(jd, newReadDbRequest("jobs", &tags, command))
return res.MoreJobsResult, res.err
if len(toRetry.Jobs) > 0 {
retryAfterJobID := toRetry.Jobs[len(toRetry.Jobs)-1].JobID
mtoken.retryAfterJobID = &retryAfterJobID
}

list = append(list, toRetry.Jobs...)
if toRetry.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, toRetry, mtoken.waitingAfterJobID)

waiting, err := jd.GetWaiting(ctx, params)
if err != nil {
return nil, err
}

Check warning on line 1870 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1869-L1870

Added lines #L1869 - L1870 were not covered by tests
if len(waiting.Jobs) > 0 {
waitingAfterJobID := waiting.Jobs[len(waiting.Jobs)-1].JobID
mtoken.waitingAfterJobID = &waitingAfterJobID
}
list = append(list, waiting.Jobs...)
if waiting.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, waiting, mtoken.unprocessedAfterJobID)

unprocessed, err := jd.GetUnprocessed(ctx, params)
if err != nil {
return nil, err
}

Check warning on line 1884 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1883-L1884

Added lines #L1883 - L1884 were not covered by tests
if len(unprocessed.Jobs) > 0 {
unprocessedAfterJobID := unprocessed.Jobs[len(unprocessed.Jobs)-1].JobID
mtoken.unprocessedAfterJobID = &unprocessedAfterJobID
}
list = append(list, unprocessed.Jobs...)
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

var cacheParameterFilters = []string{"source_id", "destination_id"}
Expand Down Expand Up @@ -2092,7 +2141,7 @@
WorkspaceID: workspaceID,
}

defer jd.getTimerStat("get_ds_time", &tags).RecordDuration()()
defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()()

containsUnprocessed := lo.Contains(params.stateFilters, Unprocessed.State)
skipCacheResult := params.afterJobID != nil
Expand Down Expand Up @@ -2138,6 +2187,16 @@
limitQuery = fmt.Sprintf(" LIMIT %d ", params.JobsLimit)
}

joinType := "LEFT"
joinTable := "v_last_" + ds.JobStatusTable

if !containsUnprocessed { // If we are not querying for unprocessed jobs, we can use an inner join
joinType = "INNER"
} else if slices.Equal(stateFilters, []string{Unprocessed.State}) {
// If we are querying only for unprocessed jobs, we should join with the status table instead of the view (performance reasons)
joinTable = ds.JobStatusTable
}

var rows *sql.Rows
sqlStatement := fmt.Sprintf(`SELECT
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
Expand All @@ -2150,10 +2209,10 @@
job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters
FROM
%[1]q AS jobs
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
%[3]s
ORDER BY jobs.job_id %[4]s`,
ds.JobTable, ds.JobStatusTable, filterQuery, limitQuery)
%[2]s JOIN %[3]q job_latest_state ON jobs.job_id=job_latest_state.job_id
%[4]s
ORDER BY jobs.job_id %[5]s`,
ds.JobTable, joinType, joinTable, filterQuery, limitQuery)

var args []interface{}

Expand Down Expand Up @@ -3097,7 +3156,7 @@
WorkspaceID: params.WorkspaceID,
}
defer jd.getTimerStat(
"get_jobs_time",
"jobsdb_get_jobs_time",
tags,
).RecordDuration()()

Expand Down Expand Up @@ -3209,7 +3268,7 @@
command := func() queryResult {
return queryResultWrapper(jd.getJobs(ctx, params, nil))
}
res := executeDbRequest(jd, newReadDbRequest("jobs", &tags, command))
res := executeDbRequest(jd, newReadDbRequest("get_jobs", &tags, command))
return res.JobsResult, res.err
}

Expand All @@ -3218,11 +3277,6 @@
err error
}

type moreQueryResult struct {
*MoreJobsResult
err error
}

func queryResultWrapper(res *MoreJobsResult, err error) queryResult {
if res == nil {
res = &MoreJobsResult{}
Expand All @@ -3233,13 +3287,6 @@
}
}

func moreQueryResultWrapper(res *MoreJobsResult, err error) moreQueryResult {
return moreQueryResult{
MoreJobsResult: res,
err: err,
}
}

func (jd *Handle) getMaxIDForDs(ds dataSetT) int64 {
var maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MAX(job_id) FROM %s`, ds.JobTable)
Expand Down
4 changes: 2 additions & 2 deletions jobsdb/queued_db_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func executeDbRequest[T any](jd *Handle, c *dbRequest[T]) T {
defer jd.getTimerStat(
fmt.Sprintf("%s_total_time", c.name),
fmt.Sprintf("jobsdb_%s_total_time", c.name),
c.tags,
).RecordDuration()()

Expand All @@ -28,7 +28,7 @@ func executeDbRequest[T any](jd *Handle, c *dbRequest[T]) T {

if queueEnabled {
queuedAt := time.Now()
waitTimeStat := jd.getTimerStat(fmt.Sprintf("%s_wait_time", c.name), c.tags)
waitTimeStat := jd.getTimerStat(fmt.Sprintf("jobsdb_%s_wait_time", c.name), c.tags)
queueCap <- struct{}{}
defer func() { <-queueCap }()
waitTimeStat.Since(queuedAt)
Expand Down
6 changes: 4 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,8 @@ var _ = Describe("Processor", Ordered, func() {

c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1)
c.mockReadProcErrorsDB.EXPECT().GetToProcess(gomock.Any(), gomock.Any(), gomock.Any()).Return(&jobsdb.MoreJobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()

Expand Down Expand Up @@ -2125,7 +2126,8 @@ var _ = Describe("Processor", Ordered, func() {
processor.config.readLoopSleep = time.Millisecond

c.mockReadProcErrorsDB.EXPECT().FailExecuting()
c.mockReadProcErrorsDB.EXPECT().GetToProcess(gomock.Any(), gomock.Any(), gomock.Any()).Return(&jobsdb.MoreJobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
Expand Down
27 changes: 23 additions & 4 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@
JobsLimit: errDBReadBatchSize,
PayloadSizeLimit: st.adaptiveLimit(payloadLimit),
}
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (*jobsdb.MoreJobsResult, error) {
return st.errorDB.GetToProcess(ctx, queryParams, nil)
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
Expand All @@ -329,8 +329,27 @@
panic(err)
}

combinedList := toProcess.Jobs
limitReached = toProcess.LimitsReached
combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)

Check warning on line 348 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L343-L348

Added lines #L343 - L348 were not covered by tests
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
}
st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down
32 changes: 22 additions & 10 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,29 @@ var _ = Describe("BatchRouter", func() {
}

payloadLimit := batchrouter.payloadLimit
var getJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetToProcess(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}, nil).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams, _ jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error) {
var res jobsdb.MoreJobsResult
if !getJobsListCalled {
getJobsListCalled = true
jobs := append([]*jobsdb.JobT{}, toRetryJobsList...)
jobs = append(jobs, unprocessedJobsList...)
res.Jobs = jobs
return &res, nil
var toRetryJobsListCalled bool
var unprocessedJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetFailed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !toRetryJobsListCalled {
toRetryJobsListCalled = true
return jobsdb.JobsResult{Jobs: toRetryJobsList}, nil
}
return &res, nil
return jobsdb.JobsResult{}, nil
}).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize - len(toRetryJobsList), PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).Times(1)

c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).AnyTimes()

c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Expand Down
23 changes: 19 additions & 4 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,30 @@
}
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (*jobsdb.MoreJobsResult, error) {
return brt.jobsDB.GetToProcess(ctx, queryParams, nil)
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
jobs = toRetry.Jobs
limitsReached = toRetry.LimitsReached
if !limitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)

Check warning on line 213 in router/batchrouter/handle.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle.go#L212-L213

Added lines #L212 - L213 were not covered by tests
}
jobs = append(jobs, unprocessed.Jobs...)
limitsReached = unprocessed.LimitsReached
}
brtQueryStat.Since(queryStart)
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].JobID < jobs[j].JobID
Expand Down
12 changes: 8 additions & 4 deletions router/internal/jobiterator/jobiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ type Iterator struct {
params jobsdb.GetQueryParams
maxQueries int
discardedPercentageTolerance int
orderGroupKeyFn func(*jobsdb.JobT) string
getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error)
state struct {
// running iterator state
jobs []*jobsdb.JobT
idx int
previousJob *jobsdb.JobT
previousJob map[string]*jobsdb.JobT

// closed indicates whether the iterator has reached the end or not
closed bool
Expand Down Expand Up @@ -73,8 +74,10 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge
maxQueries: 100,
discardedPercentageTolerance: 0,
getJobsFn: getJobsFn,
orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState },
}
ji.state.jobsLimit = params.JobsLimit
ji.state.previousJob = map[string]*jobsdb.JobT{}
for _, opt := range opts {
opt(ji)
}
Expand Down Expand Up @@ -148,10 +151,11 @@ func (ji *Iterator) Next() *jobsdb.JobT {
idx := ji.state.idx
ji.state.idx++
nextJob := ji.state.jobs[idx]
if previousJob := ji.state.previousJob; previousJob != nil && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", previousJob.JobID, nextJob.JobID))
orderGroupKey := ji.orderGroupKeyFn(nextJob)
if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID))
}
ji.state.previousJob = nextJob
ji.state.previousJob[orderGroupKey] = nextJob
return nextJob
}

Expand Down
Loading