From 2de63d1c68e8491feb3a424da27e4084347a8add Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 17 Jan 2024 18:27:43 +0530 Subject: [PATCH] chore: drain router jobs fast --- router/handle.go | 59 ++++++++++++++++++++++++++++++++++++++----- router/router_test.go | 13 +++++++++- router/worker.go | 38 +++------------------------- 3 files changed, 67 insertions(+), 43 deletions(-) diff --git a/router/handle.go b/router/handle.go index 5f4b193cefa..34484ee774a 100644 --- a/router/handle.go +++ b/router/handle.go @@ -530,23 +530,29 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo "destinationId": parameters.DestinationID, }).Increment() } + abortedJob := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to it's worker right away if eventOrderingDisabled { availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 }) if len(availableWorkers) == 0 { return nil, types.ErrWorkerNoSlot } + slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot() + if slot == nil { + return nil, types.ErrWorkerNoSlot + } + if abortedJob { + return slot, nil + } if rt.shouldBackoff(job) { + slot.Release() return nil, types.ErrJobBackoff } if rt.shouldThrottle(ctx, job, parameters) { + slot.Release() return nil, types.ErrDestinationThrottled } - if slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot(); slot != nil { // skipcq: GSC-G404 - return slot, nil - } - return nil, types.ErrWorkerNoSlot - + return slot, nil } // checking if the orderKey is in blockedOrderKeys. If yes, returning nil. @@ -557,7 +563,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo } //#JobOrder (see other #JobOrder comment) - if rt.shouldBackoff(job) { // backoff + if rt.shouldBackoff(job) && !abortedJob { // backoff blockedOrderKeys[orderKey] = struct{}{} return nil, types.ErrJobBackoff } @@ -579,7 +585,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo return nil, types.ErrBarrierExists } rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey) - if rt.shouldThrottle(ctx, job, parameters) { + if rt.shouldThrottle(ctx, job, parameters) && !abortedJob { blockedOrderKeys[orderKey] = struct{}{} worker.barrier.Leave(orderKey, job.JobID) slot.Release() @@ -589,6 +595,45 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo //#EndJobOrder } +// checks if job is configured to drain or if it's retry limit is reached +func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) bool { + drain, _ := rt.drainer.Drain(job) + if drain { + return true + } + return rt.retryLimitReached(&job.LastJobStatus) +} + +func (rt *Handle) retryLimitReached(status *jobsdb.JobStatusT) bool { + respStatusCode, _ := strconv.Atoi(status.ErrorCode) + switch respStatusCode { + case types.RouterTimedOutStatusCode, + types.RouterUnMarshalErrorCode: // 5xx errors + return false + } + + if respStatusCode < 500 { + return false + } + + firstAttemptedAtTime := time.Now() + if firstAttemptedAt := gjson.GetBytes(status.ErrorResponse, "firstAttemptedAt").Str; firstAttemptedAt != "" { + if t, err := time.Parse(misc.RFC3339Milli, firstAttemptedAt); err == nil { + firstAttemptedAtTime = t + } + } + + maxFailedCountForJob := rt.reloadableConfig.maxFailedCountForJob.Load() + retryTimeWindow := rt.reloadableConfig.retryTimeWindow.Load() + if gjson.GetBytes(status.JobParameters, "source_job_run_id").Str != "" { + maxFailedCountForJob = rt.reloadableConfig.maxFailedCountForSourcesJob.Load() + retryTimeWindow = rt.reloadableConfig.sourcesRetryTimeWindow.Load() + } + + return time.Since(firstAttemptedAtTime) > retryTimeWindow && + status.AttemptNum >= maxFailedCountForJob // retry time window exceeded +} + func (*Handle) shouldBackoff(job *jobsdb.JobT) bool { return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0 } diff --git a/router/router_test.go b/router/router_test.go index 9bd78f4b239..b681221bb53 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -152,6 +152,12 @@ func initRouter() { misc.Init() } +type drainer struct{} + +func (d *drainer) Drain(job *jobsdb.JobT) (bool, string) { + return false, "" +} + func TestBackoff(t *testing.T) { t.Run("nextAttemptAfter", func(t *testing.T) { min := 10 * time.Second @@ -218,6 +224,11 @@ func TestBackoff(t *testing.T) { noOfWorkers: 1, workerInputBufferSize: 3, barrier: barrier, + reloadableConfig: &reloadableConfig{ + maxFailedCountForJob: misc.SingleValueLoader(3), + retryTimeWindow: misc.SingleValueLoader(180 * time.Minute), + }, + drainer: &drainer{}, } workers := []*worker{{ logger: logger.NOP, @@ -241,8 +252,8 @@ func TestBackoff(t *testing.T) { require.NoError(t, err) slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{}) - require.NotNil(t, slot) require.NoError(t, err) + require.NotNil(t, slot) slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{}) require.Nil(t, slot) diff --git a/router/worker.go b/router/worker.go index 5396141f9b0..085ebf9c3b5 100644 --- a/router/worker.go +++ b/router/worker.go @@ -89,12 +89,10 @@ func (w *worker) workLoop() { if err := json.Unmarshal(job.Parameters, ¶meters); err != nil { panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err)) } - abort, abortReason := w.rt.drainer.Drain( - job, - ) + abort, abortReason := w.rt.drainer.Drain(job) abortTag := abortReason if !abort { - abort = w.retryLimitReached(&job.LastJobStatus) + abort = w.rt.retryLimitReached(&job.LastJobStatus) abortReason = string(job.LastJobStatus.ErrorResponse) abortTag = "retry limit reached" } @@ -977,7 +975,7 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa destinationJobMetadata.JobT.Parameters = misc.UpdateJSONWithNewKeyVal(destinationJobMetadata.JobT.Parameters, "reason", status.ErrorResponse) // NOTE: Old key used was "error_response" } else { status.JobState = jobsdb.Failed.State - if !w.retryLimitReached(status) { // don't delay retry time if retry limit is reached, so that the job can be aborted immediately on the next loop + if !w.rt.retryLimitReached(status) { // don't delay retry time if retry limit is reached, so that the job can be aborted immediately on the next loop status.RetryTime = status.ExecTime.Add(nextAttemptAfter(status.AttemptNum, w.rt.reloadableConfig.minRetryBackoff.Load(), w.rt.reloadableConfig.maxRetryBackoff.Load())) } } @@ -1069,36 +1067,6 @@ func (w *worker) sendDestinationResponseToConfigBackend(payload json.RawMessage, } } -func (w *worker) retryLimitReached(status *jobsdb.JobStatusT) bool { - respStatusCode, _ := strconv.Atoi(status.ErrorCode) - switch respStatusCode { - case types.RouterTimedOutStatusCode, - types.RouterUnMarshalErrorCode: // 5xx errors - return false - } - - if respStatusCode < 500 { - return false - } - - firstAttemptedAtTime := time.Now() - if firstAttemptedAt := gjson.GetBytes(status.ErrorResponse, "firstAttemptedAt").Str; firstAttemptedAt != "" { - if t, err := time.Parse(misc.RFC3339Milli, firstAttemptedAt); err == nil { - firstAttemptedAtTime = t - } - } - - maxFailedCountForJob := w.rt.reloadableConfig.maxFailedCountForJob.Load() - retryTimeWindow := w.rt.reloadableConfig.retryTimeWindow.Load() - if gjson.GetBytes(status.JobParameters, "source_job_run_id").Str != "" { - maxFailedCountForJob = w.rt.reloadableConfig.maxFailedCountForSourcesJob.Load() - retryTimeWindow = w.rt.reloadableConfig.sourcesRetryTimeWindow.Load() - } - - return time.Since(firstAttemptedAtTime) > retryTimeWindow && - status.AttemptNum >= maxFailedCountForJob // retry time window exceeded -} - // AvailableSlots returns the number of available slots in the worker's input channel func (w *worker) AvailableSlots() int { return cap(w.input) - len(w.input) - w.inputReservations