New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(router): preserve event order while draining a previously failed job #2546
Conversation
81d1172
to
d7bb5d5
Compare
Codecov ReportBase: 43.90% // Head: 43.83% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #2546 +/- ##
==========================================
- Coverage 43.90% 43.83% -0.07%
==========================================
Files 187 187
Lines 39161 39075 -86
==========================================
- Hits 17195 17130 -65
+ Misses 20877 20861 -16
+ Partials 1089 1084 -5
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
d7bb5d5
to
3ffaf01
Compare
@@ -1271,7 +1311,7 @@ func (rt *HandleT) findWorker(job *jobsdb.JobT, throttledAtTime time.Time) (toSe | |||
userID := job.UserID | |||
// checking if the user is in throttledMap. If yes, returning nil. | |||
// this check is done to maintain order. | |||
if _, ok := rt.throttledUserMap[userID]; ok { | |||
if _, ok := rt.throttledUserMap[userID]; ok && rt.guaranteeUserEventOrder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found this was missing
defer func() { | ||
if toSendWorker == nil { | ||
rt.throttler.Dec(parameters.DestinationID, userID, 1, throttledAtTime, throttler.ALL_LEVELS) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this since we need to decrement the throttler if no worker is found for the job.
As a general note, the current throttling logic (incrementing/decrementing) appears to be fragile.
cc @fracasula
4c3a44e
to
1660322
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Only comment is, to avoid draining in two places (by checking canDrainEarly), we can drain all kinds of jobs in the worker itself. True?
@chandumlg the initial intention was to leave it there as an optimization, i.e. sparing the extra loop for expired jobs that won't be picked up due to another previously failed job. But you are correct, this minor optimization doesn't justify all this extra code. Will simplify it by using a single draining strategy. Thanks! |
93836b2
to
ff8e0d0
Compare
LGTM |
Description
When a job needs to be drained from the router, we cannot drain it early, but we need to push it to the worker's queue, so that all other (waiting) jobs for the same userID have time to flush from the pipeline's buffers, before the job is actually drained.
Notion Ticket
Link
Security