Skip to content

Commit

Permalink
chore: enable backoff when event ordering is disabled (#3121)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Mar 30, 2023
1 parent 0aab933 commit bbc14c1
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 11 deletions.
11 changes: 7 additions & 4 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,8 +1201,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]st
}

if !rt.guaranteeUserEventOrder {
// if guaranteeUserEventOrder is false, assigning worker randomly and returning here.
if rt.shouldThrottle(job, parameters) {
if rt.shouldThrottle(job, parameters) || rt.shouldBackoff(job) {
return
}
toSendWorker = rt.workers[rand.Intn(rt.noOfWorkers)] // skipcq: GSC-G404
Expand All @@ -1212,7 +1211,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledOrderKeys map[string]st
//#JobOrder (see other #JobOrder comment)
index := rt.getWorkerPartition(orderKey)
worker := rt.workers[index]
if job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0 { // backoff
if rt.shouldBackoff(job) { // backoff
throttledOrderKeys[orderKey] = struct{}{}
return
}
Expand Down Expand Up @@ -1273,6 +1272,10 @@ func (rt *HandleT) shouldThrottle(job *jobsdb.JobT, parameters JobParametersT) (
return limited
}

func (*HandleT) shouldBackoff(job *jobsdb.JobT) bool {
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
}

func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
reportMetrics := make([]*utilTypes.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails)
Expand Down Expand Up @@ -1810,7 +1813,7 @@ func (rt *HandleT) Setup(
maxDSQuerySizeKeys := []string{"Router." + rt.destName + "." + "maxDSQuery", "Router." + "maxDSQuery"}
config.RegisterIntConfigVariable(10, &rt.maxDSQuerySize, true, 1, maxDSQuerySizeKeys...)

config.RegisterIntConfigVariable(10, &rt.jobIteratorMaxQueries, true, 1, "Router.jobIterator.maxQueries")
config.RegisterIntConfigVariable(50, &rt.jobIteratorMaxQueries, true, 1, "Router.jobIterator.maxQueries")
config.RegisterIntConfigVariable(10, &rt.jobIteratorDiscardedPercentageTolerance, true, 1, "Router.jobIterator.discardedPercentageTolerance")

config.RegisterBoolConfigVariable(false, &rt.enableBatching, false, "Router."+rt.destName+"."+"enableBatching")
Expand Down
83 changes: 76 additions & 7 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/enterprise/reporting"
Expand All @@ -32,6 +33,7 @@ import (
mocksRouter "github.com/rudderlabs/rudder-server/mocks/router"
mocksTransformer "github.com/rudderlabs/rudder-server/mocks/router/transformer"
mocksMultitenant "github.com/rudderlabs/rudder-server/mocks/services/multitenant"
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
"github.com/rudderlabs/rudder-server/router/types"
routerUtils "github.com/rudderlabs/rudder-server/router/utils"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
Expand Down Expand Up @@ -162,13 +164,80 @@ func initRouter() {

func TestBackoff(t *testing.T) {
loadConfig()
assert.Equal(t, 10*time.Second, durationBeforeNextAttempt(0))
assert.Equal(t, 10*time.Second, durationBeforeNextAttempt(1))
assert.Equal(t, 20*time.Second, durationBeforeNextAttempt(2))
assert.Equal(t, 40*time.Second, durationBeforeNextAttempt(3))
assert.Equal(t, 80*time.Second, durationBeforeNextAttempt(4))
assert.Equal(t, 160*time.Second, durationBeforeNextAttempt(5))
assert.Equal(t, 300*time.Second, durationBeforeNextAttempt(6))

t.Run("durationBeforeNextAttempt", func(t *testing.T) {
require.Equal(t, 10*time.Second, durationBeforeNextAttempt(0))
require.Equal(t, 10*time.Second, durationBeforeNextAttempt(1))
require.Equal(t, 20*time.Second, durationBeforeNextAttempt(2))
require.Equal(t, 40*time.Second, durationBeforeNextAttempt(3))
require.Equal(t, 80*time.Second, durationBeforeNextAttempt(4))
require.Equal(t, 160*time.Second, durationBeforeNextAttempt(5))
require.Equal(t, 300*time.Second, durationBeforeNextAttempt(6))
})

t.Run("findWorker", func(t *testing.T) {
backoffJob := &jobsdb.JobT{
JobID: 1,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 1,
RetryTime: time.Now().Add(1 * time.Hour),
},
}
noBackoffJob1 := &jobsdb.JobT{
JobID: 2,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Waiting.State,
AttemptNum: 1,
RetryTime: time.Now().Add(1 * time.Hour),
},
}
noBackoffJob2 := &jobsdb.JobT{
JobID: 3,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 0,
RetryTime: time.Now().Add(1 * time.Hour),
},
}
noBackoffJob3 := &jobsdb.JobT{
JobID: 4,
Parameters: []byte(`{"destination_id": "destination"}`),
LastJobStatus: jobsdb.JobStatusT{
JobState: jobsdb.Failed.State,
AttemptNum: 0,
RetryTime: time.Now().Add(-1 * time.Hour),
},
}

r := &HandleT{
logger: logger.NOP,
backgroundCtx: context.Background(),
noOfWorkers: 1,
workers: []*workerT{{
barrier: eventorder.NewBarrier(),
}},
}

t.Run("eventorder disabled", func(t *testing.T) {
r.guaranteeUserEventOrder = false
require.Nil(t, r.findWorker(backoffJob, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob1, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob2, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob3, map[string]struct{}{}))
})

t.Run("eventorder enabled", func(t *testing.T) {
r.guaranteeUserEventOrder = true
require.Nil(t, r.findWorker(backoffJob, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob1, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob2, map[string]struct{}{}))
require.NotNil(t, r.findWorker(noBackoffJob3, map[string]struct{}{}))
})
})
}

var _ = Describe("Router", func() {
Expand Down

0 comments on commit bbc14c1

Please sign in to comment.