Skip to content

Commit

Permalink
review addressal and some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 1, 2024
1 parent f68102a commit 25a309c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 34 deletions.
42 changes: 26 additions & 16 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
}

type reservedJob struct {
slot *workerSlot
job *jobsdb.JobT
slot *workerSlot
job *jobsdb.JobT
drainReason string
}

var statusList []*jobsdb.JobStatusT
Expand All @@ -210,7 +211,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
rt.logger.Debugf("[DRAIN DEBUG] counts %v final jobs length being processed %v", rt.destType, len(reservedJobs))
assignedTime := time.Now()
for _, reservedJob := range reservedJobs {
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime})
reservedJob.slot.Use(workerJob{job: reservedJob.job, assignedAt: assignedTime, drainReason: reservedJob.drainReason})
}
pickupCount += len(reservedJobs)
reservedJobs = nil
Expand All @@ -236,7 +237,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
firstJob = job
}
lastJob = job
slot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
workerJobSlot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
if err == nil {
traceParent := gjson.GetBytes(job.Parameters, "traceparent").String()
if traceParent != "" {
Expand Down Expand Up @@ -267,7 +268,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
WorkspaceId: job.WorkspaceId,
}
statusList = append(statusList, &status)
reservedJobs = append(reservedJobs, reservedJob{slot: slot, job: job})
reservedJobs = append(reservedJobs, reservedJob{slot: workerJobSlot.slot, job: job, drainReason: workerJobSlot.drainReason})
if shouldFlush() {
flush()
}
Expand Down Expand Up @@ -509,7 +510,12 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
return params
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
type workerJobSlot struct {
slot *workerSlot
drainReason string
}

func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerJobSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil, types.ErrContextCancelled
}
Expand All @@ -530,7 +536,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
"destinationId": parameters.DestinationID,
}).Increment()
}
abortedJob := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to it's worker right away
abortedJob, abortReason := rt.drainOrRetryLimitReached(job) // if job's aborted, then send it to it's worker right away
if eventOrderingDisabled {
availableWorkers := lo.Filter(workers, func(w *worker, _ int) bool { return w.AvailableSlots() > 0 })
if len(availableWorkers) == 0 {
Expand All @@ -541,7 +547,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrWorkerNoSlot
}

Check warning on line 548 in router/handle.go

View check run for this annotation

Codecov / codecov/patch

router/handle.go#L547-L548

Added lines #L547 - L548 were not covered by tests
if abortedJob {
return slot, nil
return &workerJobSlot{slot: slot, drainReason: abortReason}, nil
}

Check warning on line 551 in router/handle.go

View check run for this annotation

Codecov / codecov/patch

router/handle.go#L550-L551

Added lines #L550 - L551 were not covered by tests
if rt.shouldBackoff(job) {
slot.Release()
Expand All @@ -552,7 +558,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrDestinationThrottled
}

return slot, nil
return &workerJobSlot{slot: slot}, nil
}

// checking if the orderKey is in blockedOrderKeys. If yes, returning nil.
Expand All @@ -563,7 +569,7 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
}

//#JobOrder (see other #JobOrder comment)
if rt.shouldBackoff(job) && !abortedJob { // backoff
if !abortedJob && rt.shouldBackoff(job) { // backoff
blockedOrderKeys[orderKey] = struct{}{}
return nil, types.ErrJobBackoff
}
Expand All @@ -585,23 +591,27 @@ func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jo
return nil, types.ErrBarrierExists
}
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
if rt.shouldThrottle(ctx, job, parameters) && !abortedJob {
if !abortedJob && rt.shouldThrottle(ctx, job, parameters) {
blockedOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
slot.Release()
return nil, types.ErrDestinationThrottled
}
return slot, nil
return &workerJobSlot{slot: slot, drainReason: abortReason}, nil
//#EndJobOrder
}

// checks if job is configured to drain or if it's retry limit is reached
func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) bool {
drain, _ := rt.drainer.Drain(job)
func (rt *Handle) drainOrRetryLimitReached(job *jobsdb.JobT) (bool, string) {
drain, reason := rt.drainer.Drain(job)
if drain {
return true
return true, reason
}
retryLimitReached := rt.retryLimitReached(&job.LastJobStatus)
if retryLimitReached {
return true, "retry limit reached"
}
return rt.retryLimitReached(&job.LastJobStatus)
return false, ""
}

func (rt *Handle) retryLimitReached(status *jobsdb.JobStatusT) bool {
Expand Down
104 changes: 95 additions & 9 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -152,12 +153,26 @@ func initRouter() {
misc.Init()
}

type drainer struct{}
type drainer struct {
drain bool
reason string
}

func (d *drainer) Drain(job *jobsdb.JobT) (bool, string) {
return false, ""
return d.drain, d.reason
}

type mockThrottlerFactory struct {
count *atomic.Int64
}

func (m *mockThrottlerFactory) Get(destName, destID string) throttler.Throttler {
m.count.Add(1)
return throttler.NewNoOpThrottlerFactory().Get(destName, destID)
}

func (m *mockThrottlerFactory) Shutdown() {}

func TestBackoff(t *testing.T) {
t.Run("nextAttemptAfter", func(t *testing.T) {
min := 10 * time.Second
Expand Down Expand Up @@ -228,7 +243,8 @@ func TestBackoff(t *testing.T) {
maxFailedCountForJob: misc.SingleValueLoader(3),
retryTimeWindow: misc.SingleValueLoader(180 * time.Minute),
},
drainer: &drainer{},
drainer: &drainer{},
throttlerFactory: &mockThrottlerFactory{count: new(atomic.Int64)},
}
workers := []*worker{{
logger: logger.NOP,
Expand All @@ -246,18 +262,25 @@ func TestBackoff(t *testing.T) {
slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(1), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(2), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{})
require.NoError(t, err)
require.NotNil(t, slot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

// reset throttler counter
r.throttlerFactory.(*mockThrottlerFactory).count.Store(0)
})

t.Run("eventorder enabled", func(t *testing.T) {
Expand All @@ -267,22 +290,76 @@ func TestBackoff(t *testing.T) {
slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrJobBackoff)
require.Equal(t, int64(0), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(1), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob2, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(2), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob3, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob4, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(t, int64(3), r.throttlerFactory.(*mockThrottlerFactory).count.Load())

// reset throttler counter
r.throttlerFactory.(*mockThrottlerFactory).count.Store(0)
})

t.Run("eventorder enabled with drain job", func(t *testing.T) {
r.drainer = &drainer{drain: true, reason: "drain job due to some reason"}
r.guaranteeUserEventOrder = true
workers[0].inputReservations = 0

slot, err := r.findWorkerSlot(context.Background(), workers, backoffJob, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err, "drain job should be accepted even if it's to be backed off")
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.NotNil(t, slot)
require.NoError(t, err)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)

slot, err = r.findWorkerSlot(context.Background(), workers, noBackoffJob1, map[string]struct{}{})
require.Nil(t, slot)
require.ErrorIs(t, err, types.ErrWorkerNoSlot)
require.Equal(
t,
int64(0),
r.throttlerFactory.(*mockThrottlerFactory).count.Load(),
"throttle check shouldn't even happen for drain job",
)
})

t.Run("context canceled", func(t *testing.T) {
Expand Down Expand Up @@ -774,11 +851,9 @@ var _ = Describe("router", func() {
jobsdb.Aborted.State,
routerutils.DRAIN_ERROR_CODE,
fmt.Sprintf(
`{"reason": %s}`,
fmt.Sprintf(
`{"firstAttemptedAt": %q}`,
firstAttemptedAt.Format(misc.RFC3339Milli),
),
`{"reason": "%[1]s", "firstAttemptedAt": %[2]q}`,
"retry limit reached",
firstAttemptedAt.Format(misc.RFC3339Milli),
),
jobs[0].LastJobStatus.AttemptNum,
)
Expand Down Expand Up @@ -878,7 +953,18 @@ var _ = Describe("router", func() {
c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) {
Expect(drainList).To(HaveLen(1))
assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, routerutils.DRAIN_ERROR_CODE, fmt.Sprintf(`{"reason": %s}`, fmt.Sprintf(`{"firstAttemptedAt": %q}`, firstAttemptedAt.Format(misc.RFC3339Milli))), jobs[0].LastJobStatus.AttemptNum)
assertJobStatus(
jobs[0],
drainList[0],
jobsdb.Aborted.State,
routerutils.DRAIN_ERROR_CODE,
fmt.Sprintf(
`{"reason": "%[1]s", "firstAttemptedAt": %[2]q}`,
"retry limit reached",
firstAttemptedAt.Format(misc.RFC3339Milli),
),
jobs[0].LastJobStatus.AttemptNum,
)
routerAborted = true
})

Expand Down
16 changes: 7 additions & 9 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type worker struct {
}

type workerJob struct {
job *jobsdb.JobT
assignedAt time.Time
job *jobsdb.JobT
assignedAt time.Time
drainReason string
}

func (w *worker) workLoop() {
Expand Down Expand Up @@ -89,13 +90,10 @@ func (w *worker) workLoop() {
if err := json.Unmarshal(job.Parameters, &parameters); err != nil {
panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err))
}
abort, abortReason := w.rt.drainer.Drain(job)
abortReason := message.drainReason
abort := abortReason != ""
abortTag := abortReason
if !abort {
abort = w.rt.retryLimitReached(&job.LastJobStatus)
abortReason = string(job.LastJobStatus.ErrorResponse)
abortTag = "retry limit reached"
}
errResponse := routerutils.EnhanceJSON(job.LastJobStatus.ErrorResponse, "reason", abortReason)
if abort {
status := jobsdb.JobStatusT{
JobID: job.JobID,
Expand All @@ -106,7 +104,7 @@ func (w *worker) workLoop() {
ErrorCode: routerutils.DRAIN_ERROR_CODE,
Parameters: routerutils.EmptyPayload,
JobParameters: job.Parameters,
ErrorResponse: routerutils.EnhanceJSON(routerutils.EmptyPayload, "reason", abortReason),
ErrorResponse: errResponse,
WorkspaceId: job.WorkspaceId,
}
// Enhancing job parameter with the drain reason.
Expand Down

0 comments on commit 25a309c

Please sign in to comment.