Skip to content

Commit

Permalink
Run task rescheduler in background (#3038)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Aug 12, 2022
1 parent a39ba31 commit 6aaddf3
Show file tree
Hide file tree
Showing 14 changed files with 409 additions and 352 deletions.
16 changes: 0 additions & 16 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,6 @@ const (
TimerProcessorMaxPollInterval = "history.timerProcessorMaxPollInterval"
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
TimerProcessorMaxPollIntervalJitterCoefficient = "history.timerProcessorMaxPollIntervalJitterCoefficient"
// TimerProcessorRescheduleInterval is the redispatch interval for timer processor
TimerProcessorRescheduleInterval = "history.timerProcessorRescheduleInterval"
// TimerProcessorRescheduleIntervalJitterCoefficient is the redispatch interval jitter coefficient
TimerProcessorRescheduleIntervalJitterCoefficient = "history.timerProcessorRescheduleIntervalJitterCoefficient"
// TimerProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for timer processor
TimerProcessorMaxReschedulerSize = "history.timerProcessorMaxReschedulerSize"
// TimerProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for timer processor
Expand Down Expand Up @@ -408,10 +404,6 @@ const (
TransferProcessorUpdateAckIntervalJitterCoefficient = "history.transferProcessorUpdateAckIntervalJitterCoefficient"
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval = "history.transferProcessorCompleteTransferInterval"
// TransferProcessorRescheduleInterval is the redispatch interval for transferQueueProcessor
TransferProcessorRescheduleInterval = "history.transferProcessorRescheduleInterval"
// TransferProcessorRescheduleIntervalJitterCoefficient is the redispatch interval jitter coefficient
TransferProcessorRescheduleIntervalJitterCoefficient = "history.transferProcessorRescheduleIntervalJitterCoefficient"
// TransferProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor
TransferProcessorMaxReschedulerSize = "history.transferProcessorMaxReschedulerSize"
// TransferProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for transferQueueProcessor
Expand Down Expand Up @@ -453,10 +445,6 @@ const (
VisibilityProcessorUpdateAckIntervalJitterCoefficient = "history.visibilityProcessorUpdateAckIntervalJitterCoefficient"
// VisibilityProcessorCompleteTaskInterval is complete timer interval for visibilityQueueProcessor
VisibilityProcessorCompleteTaskInterval = "history.visibilityProcessorCompleteTaskInterval"
// VisibilityProcessorRescheduleInterval is the redispatch interval for visibilityQueueProcessor
VisibilityProcessorRescheduleInterval = "history.visibilityProcessorRescheduleInterval"
// VisibilityProcessorRescheduleIntervalJitterCoefficient is the redispatch interval jitter coefficient
VisibilityProcessorRescheduleIntervalJitterCoefficient = "history.visibilityProcessorRescheduleIntervalJitterCoefficient"
// VisibilityProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for visibilityQueueProcessor
VisibilityProcessorMaxReschedulerSize = "history.visibilityProcessorMaxReschedulerSize"
// VisibilityProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for visibilityQueueProcessor
Expand All @@ -480,10 +468,6 @@ const (
ReplicatorProcessorUpdateAckInterval = "history.replicatorProcessorUpdateAckInterval"
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
// ReplicatorProcessorRescheduleInterval is the redispatch interval for ReplicatorProcessor
ReplicatorProcessorRescheduleInterval = "history.replicatorProcessorRescheduleInterval"
// ReplicatorProcessorRescheduleIntervalJitterCoefficient is the redispatch interval jitter coefficient
ReplicatorProcessorRescheduleIntervalJitterCoefficient = "history.replicatorProcessorRescheduleIntervalJitterCoefficient"
// ReplicatorProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for ReplicatorProcessor
ReplicatorProcessorMaxReschedulerSize = "history.replicatorProcessorMaxReschedulerSize"
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
Expand Down
324 changes: 154 additions & 170 deletions service/history/configs/config.go

Large diffs are not rendered by default.

43 changes: 13 additions & 30 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ import (
type (
// QueueProcessorOptions is options passed to queue processor implementation
QueueProcessorOptions struct {
BatchSize dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
RescheduleInterval dynamicconfig.DurationPropertyFn
RescheduleIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxReschdulerSize dynamicconfig.IntPropertyFn
PollBackoffInterval dynamicconfig.DurationPropertyFn
MetricScope int
BatchSize dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxReschdulerSize dynamicconfig.IntPropertyFn
PollBackoffInterval dynamicconfig.DurationPropertyFn
MetricScope int
}

queueProcessorBase struct {
Expand Down Expand Up @@ -128,6 +126,8 @@ func (p *queueProcessorBase) Start() {
p.logger.Info("", tag.LifeCycleStarting, tag.ComponentTransferQueue)
defer p.logger.Info("", tag.LifeCycleStarted, tag.ComponentTransferQueue)

p.rescheduler.Start()

p.shutdownWG.Add(1)
p.notifyNewTask()
go p.processorPump()
Expand All @@ -141,6 +141,8 @@ func (p *queueProcessorBase) Stop() {
p.logger.Info("", tag.LifeCycleStopping, tag.ComponentTransferQueue)
defer p.logger.Info("", tag.LifeCycleStopped, tag.ComponentTransferQueue)

p.rescheduler.Stop()

close(p.shutdownCh)

if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
Expand Down Expand Up @@ -171,12 +173,6 @@ func (p *queueProcessorBase) processorPump() {
))
defer updateAckTimer.Stop()

rescheduleTimer := time.NewTimer(backoff.JitDuration(
p.options.RescheduleInterval(),
p.options.RescheduleIntervalJitterCoefficient(),
))
defer rescheduleTimer.Stop()

processorPumpLoop:
for {
select {
Expand Down Expand Up @@ -205,12 +201,6 @@ processorPumpLoop:
go p.Stop()
break processorPumpLoop
}
case <-rescheduleTimer.C:
p.rescheduler.Reschedule(0) // reschedule all
rescheduleTimer.Reset(backoff.JitDuration(
p.options.RescheduleInterval(),
p.options.RescheduleIntervalJitterCoefficient(),
))
}
}

Expand Down Expand Up @@ -261,14 +251,7 @@ func (p *queueProcessorBase) processBatch() {
}

func (p *queueProcessorBase) verifyReschedulerSize() bool {
length := p.rescheduler.Len()
maxLength := p.options.MaxReschdulerSize()
buffer := p.options.BatchSize() * 2
if length+buffer > maxLength {
p.rescheduler.Reschedule(length + buffer - maxLength)
}

passed := p.rescheduler.Len() < maxLength
passed := p.rescheduler.Len() < p.options.MaxReschdulerSize()
if passed && p.backoffTimer != nil {
p.backoffTimer.Stop()
p.backoffTimer = nil
Expand Down
15 changes: 8 additions & 7 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,21 +279,20 @@ func (e *executableImpl) Ack() {

func (e *executableImpl) Nack(err error) {
submitted := false
attempt := e.Attempt()
if e.shouldResubmitOnNack(attempt, err) {
if e.shouldResubmitOnNack(e.Attempt(), err) {
// we do not need to know if there any error during submission
// as long as it's not submitted, the execuable should be add
// to the rescheduler
submitted, _ = e.scheduler.TrySubmit(e)
}

if !submitted {
e.rescheduler.Add(e, e.rescheduleBackoff(attempt))
e.Reschedule()
}
}

func (e *executableImpl) Reschedule() {
e.rescheduler.Add(e, e.rescheduleBackoff(e.Attempt()))
e.rescheduler.Add(e, e.rescheduleTime(e.Attempt()))
}

func (e *executableImpl) State() ctasks.State {
Expand Down Expand Up @@ -347,11 +346,13 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
return false
}

return err == consts.ErrWorkflowBusy || common.IsContextDeadlineExceededErr(err) || e.IsRetryableError(err)
return err == consts.ErrWorkflowBusy ||
common.IsContextDeadlineExceededErr(err) ||
e.IsRetryableError(err)
}

func (e *executableImpl) rescheduleBackoff(attempt int) time.Duration {
func (e *executableImpl) rescheduleTime(attempt int) time.Time {
// elapsedTime (the first parameter) is not relevant here since reschedule policy
// has no expiration interval.
return reschedulePolicy.ComputeNextDelay(0, attempt)
return e.timeSource.Now().Add(reschedulePolicy.ComputeNextDelay(0, attempt))
}
2 changes: 1 addition & 1 deletion service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *executableSuite) TestTaskNack_Reschedule() {
return true
})

s.mockRescheduler.EXPECT().Add(executable, gomock.AssignableToTypeOf(time.Second))
s.mockRescheduler.EXPECT().Add(executable, gomock.AssignableToTypeOf(time.Now()))

executable.Nack(consts.ErrTaskRetry) // this error won't trigger re-submit
s.Equal(ctasks.TaskStatePending, executable.State())
Expand Down
135 changes: 103 additions & 32 deletions service/history/queues/rescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@ package queues

import (
"sync"
"sync/atomic"
"time"

"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/timer"
)

const (
rescheduleFailureBackoff = 3 * time.Second
)

type (
// Rescheduler buffers task executables that are failed to process and
// resubmit them to the task scheduler when the Reschedule method is called.
// TODO: remove this component when implementing multi-cursor queue processor.
// Failed task executables can be tracke by task reader/queue range
Rescheduler interface {
// Add task executable to the rescheudler.
// The backoff duration is just a hint for how long the executable
// should be bufferred before rescheduling.
Add(task Executable, backoff time.Duration)
common.Daemon

// Reschedule re-submit buffered executables to the scheduler and stops when
// targetRescheduleSize number of executables are successfully submitted.
// If targetRescheduleSize is 0, then there's no limit for the number of reschduled
// executables.
Reschedule(targetRescheduleSize int)
// Add task executable to the rescheduler.
Add(task Executable, rescheduleTime time.Time)

// Len returns the total number of task executables waiting to be rescheduled.
Len() int
Expand All @@ -65,7 +65,14 @@ type (
reschedulerImpl struct {
scheduler Scheduler
timeSource clock.TimeSource
metricProvider metrics.MetricsHandler
logger log.Logger
metricsHandler metrics.MetricsHandler

status int32
shutdownCh chan struct{}
shutdownWG sync.WaitGroup

timerGate timer.Gate

sync.Mutex
pq collection.Queue[rescheduledExecuable]
Expand All @@ -75,71 +82,135 @@ type (
func NewRescheduler(
scheduler Scheduler,
timeSource clock.TimeSource,
metricProvider metrics.MetricsHandler,
logger log.Logger,
metricsHandler metrics.MetricsHandler,
) *reschedulerImpl {
return &reschedulerImpl{
scheduler: scheduler,
timeSource: timeSource,
metricProvider: metricProvider,
logger: logger,
metricsHandler: metricsHandler,

status: common.DaemonStatusInitialized,
shutdownCh: make(chan struct{}),

timerGate: timer.NewLocalGate(timeSource),

pq: collection.NewPriorityQueue((func(this rescheduledExecuable, that rescheduledExecuable) bool {
return this.rescheduleTime.Before(that.rescheduleTime)
})),
}
}

func (r *reschedulerImpl) Start() {
if !atomic.CompareAndSwapInt32(&r.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

r.shutdownWG.Add(1)
go r.rescheduleLoop()

r.logger.Info("Task rescheduler started.", tag.LifeCycleStarted)
}

func (r *reschedulerImpl) Stop() {
if !atomic.CompareAndSwapInt32(&r.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

close(r.shutdownCh)
r.timerGate.Close()

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
r.logger.Warn("Task rescheduler timedout on shutdown.", tag.LifeCycleStopTimedout)
}

r.logger.Info("Task rescheduler stopped.", tag.LifeCycleStopped)
}

func (r *reschedulerImpl) Add(
executable Executable,
backoff time.Duration,
rescheduleTime time.Time,
) {
r.Lock()
defer r.Unlock()

r.pq.Add(rescheduledExecuable{
executable: executable,
rescheduleTime: r.timeSource.Now().Add(backoff),
rescheduleTime: rescheduleTime,
})
r.Unlock()

r.timerGate.Update(rescheduleTime)

if r.isStopped() {
r.drain()
}
}

func (r *reschedulerImpl) Reschedule(
targetRescheduleSize int,
) {
func (r *reschedulerImpl) Len() int {
r.Lock()
defer r.Unlock()

r.metricProvider.Histogram(TaskReschedulerPendingTasks, metrics.Dimensionless).Record(int64(r.pq.Len()))
return r.pq.Len()
}

if targetRescheduleSize == 0 {
targetRescheduleSize = r.pq.Len()
func (r *reschedulerImpl) rescheduleLoop() {
defer r.shutdownWG.Done()

for {
select {
case <-r.shutdownCh:
r.drain()
return
case <-r.timerGate.FireChan():
r.reschedule()
}
}

}

func (r *reschedulerImpl) reschedule() {
r.Lock()
defer r.Unlock()

r.metricsHandler.Histogram(TaskReschedulerPendingTasks, metrics.Dimensionless).Record(int64(r.pq.Len()))

var failToSubmit []rescheduledExecuable
numRescheduled := 0
for !r.pq.IsEmpty() && numRescheduled < targetRescheduleSize {
for !r.pq.IsEmpty() {
if r.timeSource.Now().Before(r.pq.Peek().rescheduleTime) {
break
}

rescheduled := r.pq.Remove()
submitted, err := r.scheduler.TrySubmit(rescheduled.executable)
executable := rescheduled.executable
submitted, err := r.scheduler.TrySubmit(executable)
if err != nil {
rescheduled.executable.Logger().Error("Failed to reschedule task", tag.Error(err))
executable.Logger().Error("Failed to reschedule task", tag.Error(err))
}

if !submitted {
rescheduled.rescheduleTime.Add(rescheduleFailureBackoff)
failToSubmit = append(failToSubmit, rescheduled)
} else {
numRescheduled++
}
}

for _, rescheduled := range failToSubmit {
r.pq.Add(rescheduled)
}

if !r.pq.IsEmpty() {
r.timerGate.Update(r.pq.Peek().rescheduleTime)
}
}

func (r *reschedulerImpl) Len() int {
func (r *reschedulerImpl) drain() {
r.Lock()
defer r.Unlock()

return r.pq.Len()
for !r.pq.IsEmpty() {
r.pq.Remove()
}
}

func (r *reschedulerImpl) isStopped() bool {
return atomic.LoadInt32(&r.status) == common.DaemonStatusStopped
}
Loading

0 comments on commit 6aaddf3

Please sign in to comment.