Skip to content

Commit

Permalink
Fix scheduled task rescheduling on failover (#5377)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 1, 2024
1 parent fb61704 commit 2647b36
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
7 changes: 4 additions & 3 deletions service/history/queues/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,10 @@ func (r *ReaderImpl) submit(
executable Executable,
) {
now := r.timeSource.Now()
// Persistence layer may lose precision when persisting the task, which essentially move
// task fire time forward. Need to account for that when submitting the task.
if fireTime := executable.GetKey().FireTime.Add(persistence.ScheduledTaskMinPrecision); now.Before(fireTime) {
// Persistence layer may lose precision when persisting the task, which essentially moves
// task fire time backward. Need to account for that when submitting the task.
fireTime := executable.GetKey().FireTime.Add(persistence.ScheduledTaskMinPrecision)
if now.Before(fireTime) {
r.rescheduler.Add(executable, fireTime)
return
}
Expand Down
9 changes: 8 additions & 1 deletion service/history/queues/rescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/timer"
"go.temporal.io/server/common/util"
)

const (
Expand Down Expand Up @@ -176,7 +178,12 @@ func (r *reschedulerImpl) Reschedule(
items := make([]rescheduledExecuable, 0, pq.Len())
for !pq.IsEmpty() {
rescheduled := pq.Remove()
rescheduled.rescheduleTime = now
// scheduled queue pre-fetches tasks,
// so we need to make sure the reschedule time is not before the task scheduled time
rescheduled.rescheduleTime = util.MaxTime(
rescheduled.executable.GetKey().FireTime.Add(persistence.ScheduledTaskMinPrecision),
now,
)
items = append(items, rescheduled)
}
r.pqMap[key] = r.newPriorityQueue(items)
Expand Down
47 changes: 45 additions & 2 deletions service/history/queues/rescheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/tasks"
)

type (
Expand Down Expand Up @@ -221,7 +222,7 @@ func (s *rescheudulerSuite) TestReschedule_DropCancelled() {
s.Equal(0, s.rescheduler.Len())
}

func (s *rescheudulerSuite) TestImmdiateReschedule() {
func (s *rescheudulerSuite) TestForceReschedule_ImmediateTask() {
now := time.Now()
s.timeSource.Update(now)
namespaceID := s.mockScheduler.TaskChannelKeyFn()(nil).NamespaceID
Expand All @@ -234,8 +235,9 @@ func (s *rescheudulerSuite) TestImmdiateReschedule() {
taskWG.Add(numTask)
for i := 0; i != numTask; i++ {
mockTask := NewMockExecutable(s.controller)
mockTask.EXPECT().State().Return(ctasks.TaskStatePending).Times(1)
mockTask.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes()
mockTask.EXPECT().SetScheduledTime(gomock.Any()).AnyTimes()
mockTask.EXPECT().GetKey().Return(tasks.NewImmediateKey(int64(i))).AnyTimes()
s.rescheduler.Add(
mockTask,
now.Add(time.Minute+time.Duration(rand.Int63n(time.Minute.Nanoseconds()))),
Expand All @@ -251,3 +253,44 @@ func (s *rescheudulerSuite) TestImmdiateReschedule() {
taskWG.Wait()
s.Equal(0, s.rescheduler.Len())
}

func (s *rescheudulerSuite) TestForceReschedule_ScheduledTask() {
now := time.Now()
s.timeSource.Update(now)
namespaceID := s.mockScheduler.TaskChannelKeyFn()(nil).NamespaceID

s.rescheduler.Start()
defer s.rescheduler.Stop()

taskWG := &sync.WaitGroup{}
taskWG.Add(1)

retryingTask := NewMockExecutable(s.controller)
retryingTask.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes()
retryingTask.EXPECT().SetScheduledTime(gomock.Any()).AnyTimes()
retryingTask.EXPECT().GetKey().Return(tasks.NewKey(now.Add(-time.Minute), int64(1))).AnyTimes()
s.rescheduler.Add(
retryingTask,
now.Add(time.Minute),
)

// schedule queue pre-fetches tasks
futureTaskTimestamp := now.Add(time.Second)
futureTask := NewMockExecutable(s.controller)
futureTask.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes()
futureTask.EXPECT().SetScheduledTime(gomock.Any()).AnyTimes()
futureTask.EXPECT().GetKey().Return(tasks.NewKey(futureTaskTimestamp, int64(2))).AnyTimes()
s.rescheduler.Add(
futureTask,
futureTaskTimestamp,
)

s.mockScheduler.EXPECT().TrySubmit(gomock.Any()).DoAndReturn(func(_ Executable) bool {
taskWG.Done()
return true
}).Times(1)

s.rescheduler.Reschedule(namespaceID)
taskWG.Wait()
s.Equal(1, s.rescheduler.Len())
}

0 comments on commit 2647b36

Please sign in to comment.