diff --git a/router/router.go b/router/router.go index 1bf09d613b..abcf66a717 100644 --- a/router/router.go +++ b/router/router.go @@ -1201,8 +1201,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]st } if !rt.guaranteeUserEventOrder { - // if guaranteeUserEventOrder is false, assigning worker randomly and returning here. - if rt.shouldThrottle(job, parameters) { + if rt.shouldThrottle(job, parameters) || rt.shouldBackoff(job) { return } toSendWorker = rt.workers[rand.Intn(rt.noOfWorkers)] // skipcq: GSC-G404 @@ -1212,7 +1211,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]st //#JobOrder (see other #JobOrder comment) index := rt.getWorkerPartition(orderKey) worker := rt.workers[index] - if job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0 { // backoff + if rt.shouldBackoff(job) { // backoff throttledOrderKeys[orderKey] = struct{}{} return } @@ -1273,6 +1272,10 @@ func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT) ( return limited } +func (*HandleT) shouldBackoff(job *jobsdb.JobT) bool { + return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0 +} + func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { reportMetrics := make([]*utilTypes.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails) @@ -1810,7 +1813,7 @@ func (rt *HandleT) Setup( maxDSQuerySizeKeys := []string{"Router." + rt.destName + "." + "maxDSQuery", "Router." + "maxDSQuery"} config.RegisterIntConfigVariable(10, &rt.maxDSQuerySize, true, 1, maxDSQuerySizeKeys...) - config.RegisterIntConfigVariable(10, &rt.jobIteratorMaxQueries, true, 1, "Router.jobIterator.maxQueries") + config.RegisterIntConfigVariable(50, &rt.jobIteratorMaxQueries, true, 1, "Router.jobIterator.maxQueries") config.RegisterIntConfigVariable(10, &rt.jobIteratorDiscardedPercentageTolerance, true, 1, "Router.jobIterator.discardedPercentageTolerance") config.RegisterBoolConfigVariable(false, &rt.enableBatching, false, "Router."+rt.destName+"."+"enableBatching") diff --git a/router/router_test.go b/router/router_test.go index 53d504a4b5..88fe2c073e 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-server/enterprise/reporting" @@ -32,6 +33,7 @@ import ( mocksRouter "github.com/rudderlabs/rudder-server/mocks/router" mocksTransformer "github.com/rudderlabs/rudder-server/mocks/router/transformer" mocksMultitenant "github.com/rudderlabs/rudder-server/mocks/services/multitenant" + "github.com/rudderlabs/rudder-server/router/internal/eventorder" "github.com/rudderlabs/rudder-server/router/types" routerUtils "github.com/rudderlabs/rudder-server/router/utils" destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" @@ -162,13 +164,80 @@ func initRouter() { func TestBackoff(t *testing.T) { loadConfig() - assert.Equal(t, 10*time.Second, durationBeforeNextAttempt(0)) - assert.Equal(t, 10*time.Second, durationBeforeNextAttempt(1)) - assert.Equal(t, 20*time.Second, durationBeforeNextAttempt(2)) - assert.Equal(t, 40*time.Second, durationBeforeNextAttempt(3)) - assert.Equal(t, 80*time.Second, durationBeforeNextAttempt(4)) - assert.Equal(t, 160*time.Second, durationBeforeNextAttempt(5)) - assert.Equal(t, 300*time.Second, durationBeforeNextAttempt(6)) + + t.Run("durationBeforeNextAttempt", func(t *testing.T) { + require.Equal(t, 10*time.Second, durationBeforeNextAttempt(0)) + require.Equal(t, 10*time.Second, durationBeforeNextAttempt(1)) + require.Equal(t, 20*time.Second, durationBeforeNextAttempt(2)) + require.Equal(t, 40*time.Second, durationBeforeNextAttempt(3)) + require.Equal(t, 80*time.Second, durationBeforeNextAttempt(4)) + require.Equal(t, 160*time.Second, durationBeforeNextAttempt(5)) + require.Equal(t, 300*time.Second, durationBeforeNextAttempt(6)) + }) + + t.Run("findWorker", func(t *testing.T) { + backoffJob := &jobsdb.JobT{ + JobID: 1, + Parameters: []byte(`{"destination_id": "destination"}`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Failed.State, + AttemptNum: 1, + RetryTime: time.Now().Add(1 * time.Hour), + }, + } + noBackoffJob1 := &jobsdb.JobT{ + JobID: 2, + Parameters: []byte(`{"destination_id": "destination"}`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Waiting.State, + AttemptNum: 1, + RetryTime: time.Now().Add(1 * time.Hour), + }, + } + noBackoffJob2 := &jobsdb.JobT{ + JobID: 3, + Parameters: []byte(`{"destination_id": "destination"}`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Failed.State, + AttemptNum: 0, + RetryTime: time.Now().Add(1 * time.Hour), + }, + } + noBackoffJob3 := &jobsdb.JobT{ + JobID: 4, + Parameters: []byte(`{"destination_id": "destination"}`), + LastJobStatus: jobsdb.JobStatusT{ + JobState: jobsdb.Failed.State, + AttemptNum: 0, + RetryTime: time.Now().Add(-1 * time.Hour), + }, + } + + r := &HandleT{ + logger: logger.NOP, + backgroundCtx: context.Background(), + noOfWorkers: 1, + workers: []*workerT{{ + barrier: eventorder.NewBarrier(), + }}, + } + + t.Run("eventorder disabled", func(t *testing.T) { + r.guaranteeUserEventOrder = false + require.Nil(t, r.findWorker(backoffJob, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob1, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob2, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob3, map[string]struct{}{})) + }) + + t.Run("eventorder enabled", func(t *testing.T) { + r.guaranteeUserEventOrder = true + require.Nil(t, r.findWorker(backoffJob, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob1, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob2, map[string]struct{}{})) + require.NotNil(t, r.findWorker(noBackoffJob3, map[string]struct{}{})) + }) + }) } var _ = Describe("Router", func() {