Skip to content

Commit

Permalink
Task executable scheduler implementation (#2750)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 22, 2022
1 parent 58456a4 commit 041eaa3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 13 deletions.
4 changes: 1 addition & 3 deletions common/tasks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler(b *testing.B) {

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
QueueSize: 2,
WorkerCount: 1,
PriorityToWeight: priorityToWeight,
},
priorityToWeight,
&noopProcessor{},
metricsClient,
logger,
Expand Down
8 changes: 2 additions & 6 deletions common/tasks/interleaved_weighted_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ type (
// InterleavedWeightedRoundRobinSchedulerOptions is the config for
// interleaved weighted round robin scheduler
InterleavedWeightedRoundRobinSchedulerOptions struct {
QueueSize int
WorkerCount int
PriorityToWeight map[int]int
}

// InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation
// ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR
InterleavedWeightedRoundRobinScheduler struct {
status int32
option InterleavedWeightedRoundRobinSchedulerOptions

processor Processor
metricsScope metrics.Scope
Expand All @@ -75,14 +73,12 @@ type (

func NewInterleavedWeightedRoundRobinScheduler(
option InterleavedWeightedRoundRobinSchedulerOptions,
priorityToWeight map[int]int,
processor Processor,
metricsClient metrics.Client,
logger log.Logger,
) *InterleavedWeightedRoundRobinScheduler {
return &InterleavedWeightedRoundRobinScheduler{
status: common.DaemonStatusInitialized,
option: option,

processor: processor,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
Expand All @@ -91,7 +87,7 @@ func NewInterleavedWeightedRoundRobinScheduler(
notifyChan: make(chan struct{}, 1),
shutdownChan: make(chan struct{}),

priorityToWeight: priorityToWeight,
priorityToWeight: option.PriorityToWeight,
weightToTaskChannels: make(map[int]*WeightedChannel),
iwrrChannels: []*WeightedChannel{},
}
Expand Down
4 changes: 1 addition & 3 deletions common/tasks/interleaved_weighted_round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() {

s.scheduler = NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
QueueSize: 2,
WorkerCount: 1,
PriorityToWeight: priorityToWeight,
},
priorityToWeight,
s.mockProcessor,
metricsClient,
logger,
Expand Down
82 changes: 81 additions & 1 deletion service/history/queues/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,96 @@ package queues

import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
tasks "go.temporal.io/server/common/tasks"
)

type (
// Scheduler is the component for scheduling and processing
// task executables. Ack(), Nack() or Reschedule() will always
// be called on all executables that have been successfully submited
// be called on all executables that have been successfully submited.
// Reschedule() will only be called after the Scheduler has been stopped
Scheduler interface {
common.Daemon

Submit(Executable) error
TrySubmit(Executable) (bool, error)
}

SchedulerOptions struct {
tasks.ParallelProcessorOptions
tasks.InterleavedWeightedRoundRobinSchedulerOptions
PriorityAssignerOptions
}

schedulerImpl struct {
priorityAssigner PriorityAssigner
wRRScheduler tasks.Scheduler
}
)

func NewScheduler(
clusterMetadata cluster.Metadata,
namespaceRegistry namespace.Registry,
options SchedulerOptions,
metricsClient metrics.Client,
logger log.Logger,
) *schedulerImpl {
priorityAssigner := NewPriorityAssigner(
clusterMetadata.GetCurrentClusterName(),
namespaceRegistry,
options.PriorityAssignerOptions,
metricsClient,
)

processor := tasks.NewParallelProcessor(
&options.ParallelProcessorOptions,
metricsClient,
logger,
)

return &schedulerImpl{
priorityAssigner: priorityAssigner,
wRRScheduler: tasks.NewInterleavedWeightedRoundRobinScheduler(
options.InterleavedWeightedRoundRobinSchedulerOptions,
processor,
metricsClient,
logger,
),
}
}

func (s *schedulerImpl) Start() {
s.wRRScheduler.Stop()
}

func (s *schedulerImpl) Stop() {
s.wRRScheduler.Stop()
}

func (s *schedulerImpl) Submit(
executable Executable,
) error {
if err := s.priorityAssigner.Assign(executable); err != nil {
executable.Logger().Error("Failed to assign task executable priority", tag.Error(err))
return err
}

s.wRRScheduler.Submit(executable)
return nil
}

func (s *schedulerImpl) TrySubmit(
executable Executable,
) (bool, error) {
if err := s.priorityAssigner.Assign(executable); err != nil {
executable.Logger().Error("Failed to assign task executable priority", tag.Error(err))
return false, err
}

return s.wRRScheduler.TrySubmit(executable), nil
}

0 comments on commit 041eaa3

Please sign in to comment.