Skip to content

Commit

Permalink
Fix host level task scheduler start/stop (#2948)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jun 4, 2022
1 parent bd5e722 commit 8444848
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 116 deletions.
59 changes: 28 additions & 31 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ type (
}

queueProcessorBase struct {
clusterName string
shard shard.Context
timeSource clock.TimeSource
options *QueueProcessorOptions
processor processor
logger log.Logger
metricsScope metrics.Scope
rateLimiter quotas.RateLimiter // Read rate limiter
ackMgr queueAckMgr
executableScheduler queues.Scheduler
rescheduler queues.Rescheduler
clusterName string
shard shard.Context
timeSource clock.TimeSource
options *QueueProcessorOptions
processor processor
logger log.Logger
metricsScope metrics.Scope
rateLimiter quotas.RateLimiter // Read rate limiter
ackMgr queueAckMgr
scheduler queues.Scheduler
rescheduler queues.Rescheduler

lastPollTime time.Time
backoffTimer *time.Timer
Expand All @@ -92,29 +92,29 @@ func newQueueProcessorBase(
processor processor,
queueAckMgr queueAckMgr,
historyCache workflow.Cache,
executableScheduler queues.Scheduler,
scheduler queues.Scheduler,
rescheduler queues.Rescheduler,
rateLimiter quotas.RateLimiter,
logger log.Logger,
metricsScope metrics.Scope,
) *queueProcessorBase {

p := &queueProcessorBase{
clusterName: clusterName,
shard: shard,
timeSource: shard.GetTimeSource(),
options: options,
processor: processor,
rateLimiter: rateLimiter,
status: common.DaemonStatusInitialized,
notifyCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
logger: logger,
metricsScope: metricsScope,
ackMgr: queueAckMgr,
lastPollTime: time.Time{},
executableScheduler: executableScheduler,
rescheduler: rescheduler,
clusterName: clusterName,
shard: shard,
timeSource: shard.GetTimeSource(),
options: options,
processor: processor,
rateLimiter: rateLimiter,
status: common.DaemonStatusInitialized,
notifyCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
logger: logger,
metricsScope: metricsScope,
ackMgr: queueAckMgr,
lastPollTime: time.Time{},
scheduler: scheduler,
rescheduler: rescheduler,
}

return p
Expand All @@ -128,7 +128,6 @@ func (p *queueProcessorBase) Start() {
p.logger.Info("", tag.LifeCycleStarting, tag.ComponentTransferQueue)
defer p.logger.Info("", tag.LifeCycleStarted, tag.ComponentTransferQueue)

p.executableScheduler.Start()
p.shutdownWG.Add(1)
p.notifyNewTask()
go p.processorPump()
Expand All @@ -147,8 +146,6 @@ func (p *queueProcessorBase) Stop() {
if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
p.logger.Warn("", tag.LifeCycleStopTimedout, tag.ComponentTransferQueue)
}

p.executableScheduler.Stop()
}

func (p *queueProcessorBase) notifyNewTask() {
Expand Down Expand Up @@ -289,7 +286,7 @@ func (p *queueProcessorBase) submitTask(
executable queues.Executable,
) {

submitted, err := p.executableScheduler.TrySubmit(executable)
submitted, err := p.scheduler.TrySubmit(executable)
if err != nil {
p.logger.Error("Failed to submit task", tag.Error(err))
executable.Reschedule()
Expand Down
106 changes: 69 additions & 37 deletions service/history/queueProcessorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package history

import (
"context"

"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
Expand All @@ -33,15 +35,13 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/worker/archiver"
Expand All @@ -62,6 +62,7 @@ var QueueProcessorModule = fx.Options(
Target: NewVisibilityQueueProcessorFactory,
},
),
fx.Invoke(QueueProcessorFactoryLifetimeHooks),
)

type (
Expand Down Expand Up @@ -105,42 +106,55 @@ type (
VisibilityMgr manager.VisibilityManager
}

replicationQueueProcessorFactoryParams struct {
fx.In

Config *configs.Config
ClientBean client.Bean
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
TaskFetcherFactory replication.TaskFetcherFactory
queueProcessorFactoryBase struct {
scheduler queues.Scheduler
hostRateLimiter quotas.RateLimiter
}

transferQueueProcessorFactory struct {
transferQueueProcessorFactoryParams

scheduler queues.Scheduler
hostRateLimiter quotas.RateLimiter
queueProcessorFactoryBase
}

timerQueueProcessorFactory struct {
timerQueueProcessorFactoryParams

scheduler queues.Scheduler
hostRateLimiter quotas.RateLimiter
queueProcessorFactoryBase
}

visibilityQueueProcessorFactory struct {
visibilityQueueProcessorFactoryParams

scheduler queues.Scheduler
hostRateLimiter quotas.RateLimiter
queueProcessorFactoryBase
}

replicationQueueProcessorFactory struct {
replicationQueueProcessorFactoryParams
QueueProcessorFactoriesLifetimeHookParams struct {
fx.In

Lifecycle fx.Lifecycle
Factories []queues.ProcessorFactory `group:"queueProcessorFactory"`
}
)

func QueueProcessorFactoryLifetimeHooks(
params QueueProcessorFactoriesLifetimeHookParams,
) {
params.Lifecycle.Append(
fx.Hook{
OnStart: func(context.Context) error {
for _, factory := range params.Factories {
factory.Start()
}
return nil
},
OnStop: func(context.Context) error {
for _, factory := range params.Factories {
factory.Stop()
}
return nil
},
},
)
}

func NewTransferQueueProcessorFactory(
params transferQueueProcessorFactoryParams,
) queues.ProcessorFactory {
Expand Down Expand Up @@ -171,11 +185,13 @@ func NewTransferQueueProcessorFactory(
}
return &transferQueueProcessorFactory{
transferQueueProcessorFactoryParams: params,
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.TransferProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
queueProcessorFactoryBase: queueProcessorFactoryBase{
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.TransferProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
},
}
}

Expand Down Expand Up @@ -227,11 +243,13 @@ func NewTimerQueueProcessorFactory(
}
return &timerQueueProcessorFactory{
timerQueueProcessorFactoryParams: params,
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.TimerProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
queueProcessorFactoryBase: queueProcessorFactoryBase{
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.TimerProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
},
}
}

Expand All @@ -255,7 +273,7 @@ func NewVisibilityQueueProcessorFactory(
params visibilityQueueProcessorFactoryParams,
) queues.ProcessorFactory {
var scheduler queues.Scheduler
if params.Config.TimerProcessorEnablePriorityTaskScheduler() {
if params.Config.VisibilityProcessorEnablePriorityTaskScheduler() {
scheduler = queues.NewScheduler(
queues.NewPriorityAssigner(
params.ClusterMetadata.GetCurrentClusterName(),
Expand All @@ -281,11 +299,13 @@ func NewVisibilityQueueProcessorFactory(
}
return &visibilityQueueProcessorFactory{
visibilityQueueProcessorFactoryParams: params,
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.VisibilityProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
queueProcessorFactoryBase: queueProcessorFactoryBase{
scheduler: scheduler,
hostRateLimiter: newQueueProcessorHostRateLimiter(
params.Config.VisibilityProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
),
},
}
}

Expand All @@ -303,6 +323,18 @@ func (f *visibilityQueueProcessorFactory) CreateProcessor(
)
}

func (f *queueProcessorFactoryBase) Start() {
if f.scheduler != nil {
f.scheduler.Start()
}
}

func (f *queueProcessorFactoryBase) Stop() {
if f.scheduler != nil {
f.scheduler.Stop()
}
}

func newQueueProcessorHostRateLimiter(
hostRPS dynamicconfig.IntPropertyFn,
fallBackRPS dynamicconfig.IntPropertyFn,
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type (
}

ProcessorFactory interface {
common.Daemon

// TODO: remove the cache parameter after workflow cache become a host level component
// and it can be provided as a parameter when creating a ProcessorFactory instance.
// Currently, workflow cache is shard level, but we can't get it from shard or engine interface,
Expand Down
24 changes: 24 additions & 0 deletions service/history/queues/queue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import (
type (
timerQueueActiveProcessorImpl struct {
timerQueueProcessorBase *timerQueueProcessorBase

// this is the scheduler owned by this active queue processor
ownedScheduler queues.Scheduler
}
)

Expand Down Expand Up @@ -89,6 +92,7 @@ func newTimerQueueActiveProcessor(

if scheduler == nil {
scheduler = newTimerTaskScheduler(shard, logger)
processor.ownedScheduler = scheduler
}

rescheduler := queues.NewRescheduler(
Expand Down Expand Up @@ -258,6 +262,7 @@ func newTimerQueueFailoverProcessor(

if scheduler == nil {
scheduler = newTimerTaskScheduler(shard, logger)
processor.ownedScheduler = scheduler
}

rescheduler := queues.NewRescheduler(
Expand Down Expand Up @@ -308,11 +313,17 @@ func newTimerQueueFailoverProcessor(
}

func (t *timerQueueActiveProcessorImpl) Start() {
if t.ownedScheduler != nil {
t.ownedScheduler.Start()
}
t.timerQueueProcessorBase.Start()
}

func (t *timerQueueActiveProcessorImpl) Stop() {
t.timerQueueProcessorBase.Stop()
if t.ownedScheduler != nil {
t.ownedScheduler.Stop()
}
}

func (t *timerQueueActiveProcessorImpl) getAckLevel() tasks.Key {
Expand Down

0 comments on commit 8444848

Please sign in to comment.