Skip to content

Commit

Permalink
fixup! chore: avoid querying a dataset in case AfterJobID falls after…
Browse files Browse the repository at this point in the history
… said dataset
  • Loading branch information
Sidddddarth committed Jun 13, 2023
1 parent 01b8a6f commit 36e72ea
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3106,6 +3106,7 @@ func (jd *HandleT) getUnprocessed(ctx context.Context, params GetQueryParamsT) (
defer jd.dsListLock.RUnlock()

dsRangeList := jd.getDSRangeList()
dsList := jd.getDSList()

limitByEventCount := false
if params.EventsLimit > 0 {
Expand All @@ -3124,17 +3125,18 @@ func (jd *HandleT) getUnprocessed(ctx context.Context, params GetQueryParamsT) (
if jd.dsLimit != nil {
dsLimit = *jd.dsLimit
}
for _, dsRange := range dsRangeList {
for idx, ds := range dsList {
if params.AfterJobID != nil {
if dsRange.maxJobID < *params.AfterJobID {
// cacheHitCount++ ?
continue
if idx < len(dsRangeList) {
if *params.AfterJobID > dsRangeList[idx].maxJobID {
continue
}
}
}
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
unprocessedJobs, dsHit, err := jd.getUnprocessedJobsDS(ctx, dsRange.ds, params)
unprocessedJobs, dsHit, err := jd.getUnprocessedJobsDS(ctx, ds, params)
if err != nil {
return JobsResult{}, err
}
Expand Down Expand Up @@ -3231,6 +3233,7 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
defer jd.dsListLock.RUnlock()

dsRangeList := jd.getDSRangeList()
dsList := jd.getDSList()

limitByEventCount := false
if params.EventsLimit > 0 {
Expand All @@ -3251,17 +3254,18 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
if jd.dsLimit != nil {
dsLimit = *jd.dsLimit
}
for _, dsRange := range dsRangeList {
for idx, ds := range dsList {
if params.AfterJobID != nil {
if dsRange.maxJobID < *params.AfterJobID {
// cacheHitCount++ ?
continue
if idx < len(dsRangeList) {
if *params.AfterJobID > dsRangeList[idx].maxJobID {
continue
}
}
}
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
processedJobs, dsHit, err := jd.getProcessedJobsDS(ctx, dsRange.ds, params)
processedJobs, dsHit, err := jd.getProcessedJobsDS(ctx, ds, params)
if err != nil {
return JobsResult{}, err
}
Expand Down

0 comments on commit 36e72ea

Please sign in to comment.