Skip to content

Commit

Permalink
fix: jobs not draining as fast as we'd like because of rate-limiting …
Browse files Browse the repository at this point in the history
…etc. (#4327)
  • Loading branch information
Sidddddarth authored and atzoum committed Feb 12, 2024
1 parent 8c92520 commit ed301a3
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 63 deletions.
83 changes: 69 additions & 14 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
}

type reservedJob struct {
slot *workerSlot
job *jobsdb.JobT
slot *workerSlot
job *jobsdb.JobT
drainReason string
}

var statusList []*jobsdb.JobStatusT
Expand All @@ -212,7 +213,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
rt.logger.Debugf("[DRAIN DEBUG] counts %v final jobs length being processed %v", rt.destType, len(reservedJobs))
assignedTime := time.Now()
for _, reservedJob := range reservedJobs {
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime})
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime, drainReason: reservedJob.drainReason})
}
pickupCount += len(reservedJobs)
reservedJobs = nil
Expand All @@ -238,7 +239,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
firstJob = job
}
lastJob = job
slot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
workerJobSlot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
if err == nil {
traceParent := gjson.GetBytes(job.Parameters, "traceparent").String()
if traceParent != "" {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
WorkspaceId: job.WorkspaceId,
}
statusList = append(statusList, &status)
reservedJobs = append(reservedJobs, reservedJob{slot: slot, job: job})
reservedJobs = append(reservedJobs, reservedJob{slot: workerJobSlot.slot, job: job, drainReason: workerJobSlot.drainReason})
if shouldFlush() {
flush()
}
Expand Down Expand Up @@ -516,7 +517,12 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
return params
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
type workerJobSlot struct {
slot *workerSlot
drainReason string
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerJobSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil, types.ErrContextCancelled
}
Expand All @@ -537,23 +543,29 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
"destinationId": parameters.DestinationID,
}).Increment()
}
abortedJob, abortReason := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to its worker right away
if eventOrderingDisabled {
availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 })
if len(availableWorkers) == 0 {
return nil, types.ErrWorkerNoSlot
}
slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot()
if slot == nil {
return nil, types.ErrWorkerNoSlot
}
if abortedJob {
return &workerJobSlot{slot: slot, drainReason: abortReason}, nil
}
if rt.shouldBackoff(job) {
slot.Release()
return nil, types.ErrJobBackoff
}
if rt.shouldThrottle(ctx, job, parameters) {
slot.Release()
return nil, types.ErrDestinationThrottled
}

if slot := availableWorkers[rand.Intn(len(availableWorkers))].ReserveSlot(); slot != nil { // skipcq: GSC-G404
return slot, nil
}
return nil, types.ErrWorkerNoSlot

return &workerJobSlot{slot: slot}, nil
}

// checking if the orderKey is in blockedOrderKeys. If yes, returning nil.
Expand All @@ -564,7 +576,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
}

//#JobOrder (see other #JobOrder comment)
if rt.shouldBackoff(job) { // backoff
if !abortedJob && rt.shouldBackoff(job) { // backoff
blockedOrderKeys[orderKey] = struct{}{}
return nil, types.ErrJobBackoff
}
Expand All @@ -586,16 +598,59 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrBarrierExists
}
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
if rt.shouldThrottle(ctx, job, parameters) {
if !abortedJob && rt.shouldThrottle(ctx, job, parameters) {
blockedOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
slot.Release()
return nil, types.ErrDestinationThrottled
}
return slot, nil
return &workerJobSlot{slot: slot, drainReason: abortReason}, nil
//#EndJobOrder
}

// checks if job is configured to drain or if it's retry limit is reached
func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) (bool, string) {
drain, reason := rt.drainer.Drain(job)
if drain {
return true, reason
}
retryLimitReached := rt.retryLimitReached(&job.LastJobStatus)
if retryLimitReached {
return true, "retry limit reached"
}
return false, ""
}

func (rt *Handle) retryLimitReached(status *jobsdb.JobStatusT) bool {
respStatusCode, _ := strconv.Atoi(status.ErrorCode)
switch respStatusCode {
case types.RouterTimedOutStatusCode,
types.RouterUnMarshalErrorCode: // 5xx errors
return false
}

if respStatusCode < http.StatusInternalServerError {
return false
}

firstAttemptedAtTime := time.Now()
if firstAttemptedAt := gjson.GetBytes(status.ErrorResponse, "firstAttemptedAt").Str; firstAttemptedAt != "" {
if t, err := time.Parse(misc.RFC3339Milli, firstAttemptedAt); err == nil {
firstAttemptedAtTime = t
}
}

maxFailedCountForJob := rt.reloadableConfig.maxFailedCountForJob.Load()
retryTimeWindow := rt.reloadableConfig.retryTimeWindow.Load()
if gjson.GetBytes(status.JobParameters, "source_job_run_id").Str != "" {
maxFailedCountForJob = rt.reloadableConfig.maxFailedCountForSourcesJob.Load()
retryTimeWindow = rt.reloadableConfig.sourcesRetryTimeWindow.Load()
}

return time.Since(firstAttemptedAtTime) > retryTimeWindow &&
status.AttemptNum >= maxFailedCountForJob // retry time window exceeded
}

func (*Handle) shouldBackoff(job *jobsdb.JobT) bool {
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
}
Expand Down
111 changes: 104 additions & 7 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -152,6 +153,26 @@ func initRouter() {
misc.Init()
}

type drainer struct {
drain bool
reason string
}

func (d *drainer) Drain(job *jobsdb.JobT) (bool, string) {
return d.drain, d.reason
}

type mockThrottlerFactory struct {
count *atomic.Int64
}

func (m *mockThrottlerFactory) Get(destName, destID string) throttler.Throttler {
m.count.Add(1)
return throttler.NewNoOpThrottlerFactory().Get(destName, destID)
}

func (m *mockThrottlerFactory) Shutdown() {}

func TestBackoff(t *testing.T) {
t.Run("nextAttemptAfter", func(t *testing.T) {
min := 10 * time.Second
Expand Down Expand Up @@ -218,6 +239,12 @@ func TestBackoff(t *testing.T) {
noOfWorkers: 1,
workerInputBufferSize: 3,
barrier: barrier,
reloadableConfig: &reloadableConfig{
maxFailedCountForJob: misc.SingleValueLoader(3),
retryTimeWindow: misc.SingleValueLoader(180 * time.Minute),
},
drainer: &drainer{},
throttlerFactory: &mockThrottlerFactory{count: new(atomic.Int64)},
}
workers := []*worker{{
logger: logger.NOP,
Expand All @@ -235,18 +262,25 @@ func TestBackoff(t *testing.T) {
slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(1), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(2), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.NotNil(t, slot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

// reset throttler counter
r.throttlerFactory.(*mockThrottlerFactory).count.Store(0)
})

t.Run("eventorder enabled", func(t *testing.T) {
Expand All @@ -256,22 +290,76 @@ func TestBackoff(t *testing.T) {
slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobBackoff)
require.Equal(t, int64(0), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(1), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(2), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

// reset throttler counter
r.throttlerFactory.(*mockThrottlerFactory).count.Store(0)
})

t.Run("eventorder enabled with drain job", func(t *testing.T) {
r.drainer = &drainer{drain: true, reason: "drain job due to some reason"}
r.guaranteeUserEventOrder = true
workers[0].inputReservations = 0

slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err, "drain job should be accepted even if it's to be backed off")
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)
})

t.Run("context canceled", func(t *testing.T) {
Expand Down Expand Up @@ -763,11 +851,9 @@ var _ = Describe("router", func() {
jobsdb.Aborted.State,
routerutils.DRAIN_ERROR_CODE,
fmt.Sprintf(
`{"reason": %s}`,
fmt.Sprintf(
`{"firstAttemptedAt": %q}`,
firstAttemptedAt.Format(misc.RFC3339Milli),
),
`{"reason": "%[1]s", "firstAttemptedAt": %[2]q}`,
"retry limit reached",
firstAttemptedAt.Format(misc.RFC3339Milli),
),
jobs[0].LastJobStatus.AttemptNum,
)
Expand Down Expand Up @@ -867,7 +953,18 @@ var _ = Describe("router", func() {
c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) {
Expect(drainList).To(HaveLen(1))
assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, routerutils.DRAIN_ERROR_CODE, fmt.Sprintf(`{"reason": %s}`, fmt.Sprintf(`{"firstAttemptedAt": %q}`, firstAttemptedAt.Format(misc.RFC3339Milli))), jobs[0].LastJobStatus.AttemptNum)
assertJobStatus(
jobs[0],
drainList[0],
jobsdb.Aborted.State,
routerutils.DRAIN_ERROR_CODE,
fmt.Sprintf(
`{"reason": "%[1]s", "firstAttemptedAt": %[2]q}`,
"retry limit reached",
firstAttemptedAt.Format(misc.RFC3339Milli),
),
jobs[0].LastJobStatus.AttemptNum,
)
routerAborted = true
})

Expand Down

0 comments on commit ed301a3

Please sign in to comment.