Skip to content

Commit

Permalink
chore: drain router jobs fast
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Jan 17, 2024
1 parent 5276978 commit a942ba6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
59 changes: 52 additions & 7 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,23 +530,29 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
"destinationId": parameters.DestinationID,
}).Increment()
}
abortedJob := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to it's worker right away
if eventOrderingDisabled {
availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 })
if len(availableWorkers) == 0 {
return nil, types.ErrWorkerNoSlot
}
slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot()
if slot == nil {
return nil, types.ErrWorkerNoSlot
}
if abortedJob {
return slot, nil
}
if rt.shouldBackoff(job) {
slot.Release()
return nil, types.ErrJobBackoff
}
if rt.shouldThrottle(ctx, job, parameters) {
slot.Release()
return nil, types.ErrDestinationThrottled
}

if slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot(); slot != nil { // skipcq: GSC-G404
return slot, nil
}
return nil, types.ErrWorkerNoSlot

return slot, nil
}

// checking if the orderKey is in blockedOrderKeys. If yes, returning nil.
Expand All @@ -557,7 +563,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
}

//#JobOrder (see other #JobOrder comment)
if rt.shouldBackoff(job) { // backoff
if rt.shouldBackoff(job) && !abortedJob { // backoff
blockedOrderKeys[orderKey] = struct{}{}
return nil, types.ErrJobBackoff
}
Expand All @@ -579,7 +585,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrBarrierExists
}
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
if rt.shouldThrottle(ctx, job, parameters) {
if rt.shouldThrottle(ctx, job, parameters) && !abortedJob {
blockedOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
slot.Release()
Expand All @@ -589,6 +595,45 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
//#EndJobOrder
}

// checks if job is configured to drain or if it's retry limit is reached
func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) bool {
drain, _ := rt.drainer.Drain(job)
if drain {
return true
}
return rt.retryLimitReached(&job.LastJobStatus)
}

func (rt *Handle) retryLimitReached(status *jobsdb.JobStatusT) bool {
respStatusCode, _ := strconv.Atoi(status.ErrorCode)
switch respStatusCode {
case types.RouterTimedOutStatusCode,
types.RouterUnMarshalErrorCode: // 5xx errors
return false
}

if respStatusCode < 500 {
return false
}

firstAttemptedAtTime := time.Now()
if firstAttemptedAt := gjson.GetBytes(status.ErrorResponse, "firstAttemptedAt").Str; firstAttemptedAt != "" {
if t, err := time.Parse(misc.RFC3339Milli, firstAttemptedAt); err == nil {
firstAttemptedAtTime = t
}
}

maxFailedCountForJob := rt.reloadableConfig.maxFailedCountForJob.Load()
retryTimeWindow := rt.reloadableConfig.retryTimeWindow.Load()
if gjson.GetBytes(status.JobParameters, "source_job_run_id").Str != "" {
maxFailedCountForJob = rt.reloadableConfig.maxFailedCountForSourcesJob.Load()
retryTimeWindow = rt.reloadableConfig.sourcesRetryTimeWindow.Load()
}

return time.Since(firstAttemptedAtTime) > retryTimeWindow &&
status.AttemptNum >= maxFailedCountForJob // retry time window exceeded
}

func (*Handle) shouldBackoff(job *jobsdb.JobT) bool {
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
}
Expand Down
13 changes: 12 additions & 1 deletion router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func initRouter() {
misc.Init()
}

type drainer struct{}

func (d *drainer) Drain(job *jobsdb.JobT) (bool, string) {
return false, ""
}

func TestBackoff(t *testing.T) {
t.Run("nextAttemptAfter", func(t *testing.T) {
min := 10 * time.Second
Expand Down Expand Up @@ -218,6 +224,11 @@ func TestBackoff(t *testing.T) {
noOfWorkers: 1,
workerInputBufferSize: 3,
barrier: barrier,
reloadableConfig: &reloadableConfig{
maxFailedCountForJob: misc.SingleValueLoader(3),
retryTimeWindow: misc.SingleValueLoader(180 * time.Minute),
},
drainer: &drainer{},
}
workers := []*worker{{
logger: logger.NOP,
Expand All @@ -241,8 +252,8 @@ func TestBackoff(t *testing.T) {
require.NoError(t, err)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.NotNil(t, slot)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
Expand Down
38 changes: 3 additions & 35 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,10 @@ func (w *worker) workLoop() {
if err := json.Unmarshal(job.Parameters, &parameters); err != nil {
panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err))
}
abort, abortReason := w.rt.drainer.Drain(
job,
)
abort, abortReason := w.rt.drainer.Drain(job)
abortTag := abortReason
if !abort {
abort = w.retryLimitReached(&job.LastJobStatus)
abort = w.rt.retryLimitReached(&job.LastJobStatus)
abortReason = string(job.LastJobStatus.ErrorResponse)
abortTag = "retry limit reached"
}
Expand Down Expand Up @@ -977,7 +975,7 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa
destinationJobMetadata.JobT.Parameters = misc.UpdateJSONWithNewKeyVal(destinationJobMetadata.JobT.Parameters, "reason", status.ErrorResponse) // NOTE: Old key used was "error_response"
} else {
status.JobState = jobsdb.Failed.State
if !w.retryLimitReached(status) { // don't delay retry time if retry limit is reached, so that the job can be aborted immediately on the next loop
if !w.rt.retryLimitReached(status) { // don't delay retry time if retry limit is reached, so that the job can be aborted immediately on the next loop
status.RetryTime = status.ExecTime.Add(nextAttemptAfter(status.AttemptNum, w.rt.reloadableConfig.minRetryBackoff.Load(), w.rt.reloadableConfig.maxRetryBackoff.Load()))
}
}
Expand Down Expand Up @@ -1069,36 +1067,6 @@ func (w *worker) sendDestinationResponseToConfigBackend(payload json.RawMessage,
}
}

func (w *worker) retryLimitReached(status *jobsdb.JobStatusT) bool {
respStatusCode, _ := strconv.Atoi(status.ErrorCode)
switch respStatusCode {
case types.RouterTimedOutStatusCode,
types.RouterUnMarshalErrorCode: // 5xx errors
return false
}

if respStatusCode < 500 {
return false
}

firstAttemptedAtTime := time.Now()
if firstAttemptedAt := gjson.GetBytes(status.ErrorResponse, "firstAttemptedAt").Str; firstAttemptedAt != "" {
if t, err := time.Parse(misc.RFC3339Milli, firstAttemptedAt); err == nil {
firstAttemptedAtTime = t
}
}

maxFailedCountForJob := w.rt.reloadableConfig.maxFailedCountForJob.Load()
retryTimeWindow := w.rt.reloadableConfig.retryTimeWindow.Load()
if gjson.GetBytes(status.JobParameters, "source_job_run_id").Str != "" {
maxFailedCountForJob = w.rt.reloadableConfig.maxFailedCountForSourcesJob.Load()
retryTimeWindow = w.rt.reloadableConfig.sourcesRetryTimeWindow.Load()
}

return time.Since(firstAttemptedAtTime) > retryTimeWindow &&
status.AttemptNum >= maxFailedCountForJob // retry time window exceeded
}

// AvailableSlots returns the number of available slots in the worker's input channel
func (w *worker) AvailableSlots() int {
return cap(w.input) - len(w.input) - w.inputReservations
Expand Down

0 comments on commit a942ba6

Please sign in to comment.