Skip to content

Commit

Permalink
fix: increased postgres cpu usage after switching to a single get job…
Browse files Browse the repository at this point in the history
…s query (#3812)
  • Loading branch information
atzoum committed Sep 4, 2023
1 parent dd4dd3d commit e4a65f3
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 58 deletions.
111 changes: 79 additions & 32 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1818,27 +1818,76 @@ func (jd *Handle) invalidateCacheForJobs(ds dataSetT, jobList []*JobT) {
}
}

type moreTokenLegacy struct {
retryAfterJobID *int64
waitingAfterJobID *int64
unprocessedAfterJobID *int64
}

type moreToken struct {
afterJobID *int64
}

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) { // skipcq: CRT-P0003
if params.JobsLimit == 0 {
return &MoreJobsResult{More: more}, nil

mtoken := &moreTokenLegacy{}
if more != nil {
var ok bool
if mtoken, ok = more.(*moreTokenLegacy); !ok {
return nil, fmt.Errorf("invalid token: %+v", more)
}
}
params.stateFilters = []string{Failed.State, Waiting.State, Unprocessed.State}
slices.Sort(params.stateFilters)
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
updateParams := func(params *GetQueryParams, jobs JobsResult, nextAfterJobID *int64) {
params.JobsLimit -= len(jobs.Jobs)
if params.EventsLimit > 0 {
params.EventsLimit -= jobs.EventsCount
}
if params.PayloadSizeLimit > 0 {
params.PayloadSizeLimit -= jobs.PayloadSize
}
params.afterJobID = nextAfterJobID
}
command := func() moreQueryResult {
return moreQueryResultWrapper(jd.getJobs(ctx, params, more))
var list []*JobT
params.afterJobID = mtoken.retryAfterJobID
toRetry, err := jd.GetFailed(ctx, params)
if err != nil {
return nil, err
}
res := executeDbRequest(jd, newReadDbRequest("jobs", &tags, command))
return res.MoreJobsResult, res.err
if len(toRetry.Jobs) > 0 {
retryAfterJobID := toRetry.Jobs[len(toRetry.Jobs)-1].JobID
mtoken.retryAfterJobID = &retryAfterJobID
}

list = append(list, toRetry.Jobs...)
if toRetry.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, toRetry, mtoken.waitingAfterJobID)

waiting, err := jd.GetWaiting(ctx, params)
if err != nil {
return nil, err
}
if len(waiting.Jobs) > 0 {
waitingAfterJobID := waiting.Jobs[len(waiting.Jobs)-1].JobID
mtoken.waitingAfterJobID = &waitingAfterJobID
}
list = append(list, waiting.Jobs...)
if waiting.LimitsReached {
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: true}, More: mtoken}, nil
}
updateParams(&params, waiting, mtoken.unprocessedAfterJobID)

unprocessed, err := jd.GetUnprocessed(ctx, params)
if err != nil {
return nil, err
}
if len(unprocessed.Jobs) > 0 {
unprocessedAfterJobID := unprocessed.Jobs[len(unprocessed.Jobs)-1].JobID
mtoken.unprocessedAfterJobID = &unprocessedAfterJobID
}
list = append(list, unprocessed.Jobs...)
return &MoreJobsResult{JobsResult: JobsResult{Jobs: list, LimitsReached: unprocessed.LimitsReached}, More: mtoken}, nil
}

var cacheParameterFilters = []string{"source_id", "destination_id"}
Expand Down Expand Up @@ -2092,7 +2141,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
WorkspaceID: workspaceID,
}

defer jd.getTimerStat("get_ds_time", &tags).RecordDuration()()
defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()()

containsUnprocessed := lo.Contains(params.stateFilters, Unprocessed.State)
skipCacheResult := params.afterJobID != nil
Expand Down Expand Up @@ -2138,6 +2187,16 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
limitQuery = fmt.Sprintf(" LIMIT %d ", params.JobsLimit)
}

joinType := "LEFT"
joinTable := "v_last_" + ds.JobStatusTable

if !containsUnprocessed { // If we are not querying for unprocessed jobs, we can use an inner join
joinType = "INNER"
} else if slices.Equal(stateFilters, []string{Unprocessed.State}) {
// If we are querying only for unprocessed jobs, we should join with the status table instead of the view (performance reasons)
joinTable = ds.JobStatusTable
}

var rows *sql.Rows
sqlStatement := fmt.Sprintf(`SELECT
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
Expand All @@ -2150,10 +2209,10 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, params GetQueryPar
job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters
FROM
%[1]q AS jobs
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
%[3]s
ORDER BY jobs.job_id %[4]s`,
ds.JobTable, ds.JobStatusTable, filterQuery, limitQuery)
%[2]s JOIN %[3]q job_latest_state ON jobs.job_id=job_latest_state.job_id
%[4]s
ORDER BY jobs.job_id %[5]s`,
ds.JobTable, joinType, joinTable, filterQuery, limitQuery)

var args []interface{}

Expand Down Expand Up @@ -3097,7 +3156,7 @@ func (jd *Handle) getJobs(ctx context.Context, params GetQueryParams, more MoreT
WorkspaceID: params.WorkspaceID,
}
defer jd.getTimerStat(
"get_jobs_time",
"jobsdb_get_jobs_time",
tags,
).RecordDuration()()

Expand Down Expand Up @@ -3209,7 +3268,7 @@ func (jd *Handle) GetJobs(ctx context.Context, states []string, params GetQueryP
command := func() queryResult {
return queryResultWrapper(jd.getJobs(ctx, params, nil))
}
res := executeDbRequest(jd, newReadDbRequest("jobs", &tags, command))
res := executeDbRequest(jd, newReadDbRequest("get_jobs", &tags, command))
return res.JobsResult, res.err
}

Expand All @@ -3218,11 +3277,6 @@ type queryResult struct {
err error
}

type moreQueryResult struct {
*MoreJobsResult
err error
}

func queryResultWrapper(res *MoreJobsResult, err error) queryResult {
if res == nil {
res = &MoreJobsResult{}
Expand All @@ -3233,13 +3287,6 @@ func queryResultWrapper(res *MoreJobsResult, err error) queryResult {
}
}

func moreQueryResultWrapper(res *MoreJobsResult, err error) moreQueryResult {
return moreQueryResult{
MoreJobsResult: res,
err: err,
}
}

func (jd *Handle) getMaxIDForDs(ds dataSetT) int64 {
var maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MAX(job_id) FROM %s`, ds.JobTable)
Expand Down
4 changes: 2 additions & 2 deletions jobsdb/queued_db_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func executeDbRequest[T any](jd *Handle, c *dbRequest[T]) T {
defer jd.getTimerStat(
fmt.Sprintf("%s_total_time", c.name),
fmt.Sprintf("jobsdb_%s_total_time", c.name),
c.tags,
).RecordDuration()()

Expand All @@ -28,7 +28,7 @@ func executeDbRequest[T any](jd *Handle, c *dbRequest[T]) T {

if queueEnabled {
queuedAt := time.Now()
waitTimeStat := jd.getTimerStat(fmt.Sprintf("%s_wait_time", c.name), c.tags)
waitTimeStat := jd.getTimerStat(fmt.Sprintf("jobsdb_%s_wait_time", c.name), c.tags)
queueCap <- struct{}{}
defer func() { <-queueCap }()
waitTimeStat.Since(queuedAt)
Expand Down
6 changes: 4 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,8 @@ var _ = Describe("Processor", Ordered, func() {

c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockReadProcErrorsDB.EXPECT().FailExecuting().Times(1)
c.mockReadProcErrorsDB.EXPECT().GetToProcess(gomock.Any(), gomock.Any(), gomock.Any()).Return(&jobsdb.MoreJobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).AnyTimes()
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()

Expand Down Expand Up @@ -2125,7 +2126,8 @@ var _ = Describe("Processor", Ordered, func() {
processor.config.readLoopSleep = time.Millisecond

c.mockReadProcErrorsDB.EXPECT().FailExecuting()
c.mockReadProcErrorsDB.EXPECT().GetToProcess(gomock.Any(), gomock.Any(), gomock.Any()).Return(&jobsdb.MoreJobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetFailed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockReadProcErrorsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{}, nil).AnyTimes()
c.mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
c.mockRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetPileUpCounts(gomock.Any()).AnyTimes()
Expand Down
27 changes: 23 additions & 4 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
JobsLimit: errDBReadBatchSize,
PayloadSizeLimit: st.adaptiveLimit(payloadLimit),
}
toProcess, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (*jobsdb.MoreJobsResult, error) {
return st.errorDB.GetToProcess(ctx, queryParams, nil)
toRetry, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetFailed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
Expand All @@ -329,8 +329,27 @@ func (st *HandleT) readErrJobsLoop(ctx context.Context) {
panic(err)
}

combinedList := toProcess.Jobs
limitReached = toProcess.LimitsReached
combinedList := toRetry.Jobs
limitReached = toRetry.LimitsReached
if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(ctx, st.jobdDBQueryRequestTimeout, st.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return st.errorDB.GetUnprocessed(ctx, queryParams)
}, sendQueryRetryStats)
if err != nil {
if ctx.Err() != nil { // we are shutting down
close(st.errProcessQ)
return
}
st.logger.Errorf("Error occurred while reading proc error jobs. Err: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
limitReached = unprocessed.LimitsReached
}
st.statErrDBR.Since(start)

if len(combinedList) == 0 {
Expand Down
32 changes: 22 additions & 10 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,29 @@ var _ = Describe("BatchRouter", func() {
}

payloadLimit := batchrouter.payloadLimit
var getJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetToProcess(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}, nil).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams, _ jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error) {
var res jobsdb.MoreJobsResult
if !getJobsListCalled {
getJobsListCalled = true
jobs := append([]*jobsdb.JobT{}, toRetryJobsList...)
jobs = append(jobs, unprocessedJobsList...)
res.Jobs = jobs
return &res, nil
var toRetryJobsListCalled bool
var unprocessedJobsListCalled bool
c.mockBatchRouterJobsDB.EXPECT().GetFailed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !toRetryJobsListCalled {
toRetryJobsListCalled = true
return jobsdb.JobsResult{Jobs: toRetryJobsList}, nil
}
return &res, nil
return jobsdb.JobsResult{}, nil
}).AnyTimes()
c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize - len(toRetryJobsList), PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).Times(1)

c.mockBatchRouterJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{CustomValFilters: []string{CustomVal["S3"]}, JobsLimit: c.jobQueryBatchSize, PayloadSizeLimit: payloadLimit}).DoAndReturn(func(ctx context.Context, params jobsdb.GetQueryParams) (jobsdb.JobsResult, error) {
if !unprocessedJobsListCalled {
unprocessedJobsListCalled = true
return jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil
}
return jobsdb.JobsResult{}, nil
}).AnyTimes()

c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Expand Down
23 changes: 19 additions & 4 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,30 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
}
brt.isolationStrategy.AugmentQueryParams(partition, &queryParams)
var limitsReached bool
toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (*jobsdb.MoreJobsResult, error) {
return brt.jobsDB.GetToProcess(ctx, queryParams, nil)
toRetry, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetFailed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = toProcess.Jobs
limitsReached = toProcess.LimitsReached
jobs = toRetry.Jobs
limitsReached = toRetry.LimitsReached
if !limitsReached {
queryParams.JobsLimit -= len(toRetry.Jobs)
if queryParams.PayloadSizeLimit > 0 {
queryParams.PayloadSizeLimit -= toRetry.PayloadSize
}
unprocessed, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout, brt.jobdDBMaxRetries, func(ctx context.Context) (jobsdb.JobsResult, error) {
return brt.jobsDB.GetUnprocessed(ctx, queryParams)
}, brt.sendQueryRetryStats)
if err != nil {
brt.logger.Errorf("BRT: %s: Error while reading from DB: %v", brt.destType, err)
panic(err)
}
jobs = append(jobs, unprocessed.Jobs...)
limitsReached = unprocessed.LimitsReached
}
brtQueryStat.Since(queryStart)
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].JobID < jobs[j].JobID
Expand Down
12 changes: 8 additions & 4 deletions router/internal/jobiterator/jobiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ type Iterator struct {
params jobsdb.GetQueryParams
maxQueries int
discardedPercentageTolerance int
orderGroupKeyFn func(*jobsdb.JobT) string
getJobsFn func(context.Context, jobsdb.GetQueryParams, jobsdb.MoreToken) (*jobsdb.MoreJobsResult, error)
state struct {
// running iterator state
jobs []*jobsdb.JobT
idx int
previousJob *jobsdb.JobT
previousJob map[string]*jobsdb.JobT

// closed indicates whether the iterator has reached the end or not
closed bool
Expand Down Expand Up @@ -73,8 +74,10 @@ func New(params jobsdb.GetQueryParams, getJobsFn func(context.Context, jobsdb.Ge
maxQueries: 100,
discardedPercentageTolerance: 0,
getJobsFn: getJobsFn,
orderGroupKeyFn: func(job *jobsdb.JobT) string { return job.LastJobStatus.JobState },
}
ji.state.jobsLimit = params.JobsLimit
ji.state.previousJob = map[string]*jobsdb.JobT{}
for _, opt := range opts {
opt(ji)
}
Expand Down Expand Up @@ -148,10 +151,11 @@ func (ji *Iterator) Next() *jobsdb.JobT {
idx := ji.state.idx
ji.state.idx++
nextJob := ji.state.jobs[idx]
if previousJob := ji.state.previousJob; previousJob != nil && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs: previousJobID: %d, nextJobID: %d", previousJob.JobID, nextJob.JobID))
orderGroupKey := ji.orderGroupKeyFn(nextJob)
if previousJob, ok := ji.state.previousJob[orderGroupKey]; ok && previousJob.JobID > nextJob.JobID {
panic(fmt.Errorf("job iterator encountered out of order jobs for group key %s: previousJobID: %d, nextJobID: %d", orderGroupKey, previousJob.JobID, nextJob.JobID))
}
ji.state.previousJob = nextJob
ji.state.previousJob[orderGroupKey] = nextJob
return nextJob
}

Expand Down

0 comments on commit e4a65f3

Please sign in to comment.