Fix expired task handing in fairTaskReader#9775
Conversation
| "backlog count should not be incremented") | ||
| } | ||
|
|
||
| func (s *BacklogManagerTestSuite) TestSkipExpiredTasks_AllExpiredThenValid() { |
There was a problem hiding this comment.
Suggest restructuring this test so its a bit easier to read.
type taskBlock struct {
count int
expired bool
}
func expiredBlock(n int) taskBlock { return taskBlock{count: n, expired: true} }
func validBlock(n int) taskBlock { return taskBlock{count: n, expired: false} }
func (s *BacklogManagerTestSuite) TestSkipExpiredTasks_AllExpiredThenValid() {
s.testSkipExpiredTasks(/batchSize/ 10, expiredBlock(33), validBlock(3))
}
func (s *BacklogManagerTestSuite) TestSkipExpiredTasks_ValidExpiredValid() {
s.testSkipExpiredTasks(/batchSize/ 10, validBlock(3), expiredBlock(33), validBlock(3))
}
func (s *BacklogManagerTestSuite) TestSkipExpiredTasks_AllExpired() {
s.testSkipExpiredTasks(/batchSize/ 10, expiredBlock(20))
}
The call sites now read like English: "batch size 10, 33 expired then 3 valid."
Inside the helper, replace the pattern expansion with a flat loop over the runs:
func (s *BacklogManagerTestSuite) testSkipExpiredTasks(batchSize int, runs ...taskRun) {
// ...setup...
var dbTasks []*persistencespb.AllocatedTaskInfo
var numValid int
id := int64(1)
for _, run := range runs {
for range run.count {
task := &persistencespb.AllocatedTaskInfo{
TaskId: id,
Data: &persistencespb.TaskInfo{
CreateTime: timestamp.TimeNowPtrUtcAddSeconds(-3600),
},
}
if run.expired {
task.Data.ExpiryTime = timestamp.TimeNowPtrUtcAddSeconds(-60)
} else {
task.Data.ExpiryTime = timestamp.TimeNowPtrUtcAddSeconds(3600)
numValid++
}
if s.fairness {
task.TaskPass = id * passMultiplier
}
dbTasks = append(dbTasks, task)
id++
}
}
// ...rest unchanged...
}
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s)
## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s) (cherry picked from commit 0b372d5)
) ## What changed? Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately. ## Why? As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could. This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) – and tested it fails without the fix - [ ] added new functional test(s) Co-authored-by: David Reiss <david@temporal.io>
What changed?
Instead of dropping expired tasks immediately, pass them through mergeTasksLocked and treat them as already-acked, so that the read level and ack level can be updated appropriately.
Why?
As the previous comment suggests, reading a batch of entirely expired tasks would lead to incorrect behavior: since the acks were not placed in outstandingTasks, the read level wouldn't be updated, and we'd re-read the expired tasks in a loop, i.e. the reader would be stuck. (Unless a task was written within that range, that would unstick it.) Even if we read a batch that wasn't all expired, but ended in a long stretch of expired tasks, we wouldn't update the read level as much as we could.
This adds expired tasks as acks, and then adds a call to advanceAckLevelLocked, so the read level can be pushed up, and we can advance the ack level over a stretch of expired tasks as well.
How did you test it?