Skip to content

Commit

Permalink
Perf optimize interleaved weighted round robin scheduler (#3034)
Browse files Browse the repository at this point in the history
* Perf optimize interleaved weighted round robin scheduler
  * Bypass scheduler if there is no other task
  * Remove unnecessary shutdown check

```bash
before
BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential-16    	 6610477	       176.0 ns/op	       8 B/op	       1 allocs/op
BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel-16    	 3230086	       346.7 ns/op	       8 B/op	       1 allocs/op

after
BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential-16    	26722878	        46.48 ns/op	       8 B/op	       1 allocs/op
BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel-16    	 4259040	       267.4 ns/op	       8 B/op	       1 allocs/op
```
  • Loading branch information
wxing1292 committed Jun 29, 2022
1 parent 53a9cce commit 0d4e39b
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 33 deletions.
37 changes: 34 additions & 3 deletions common/tasks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,21 @@ var (
_ PriorityTask = (*noopTask)(nil)
)

func BenchmarkInterleavedWeightedRoundRobinScheduler(b *testing.B) {
priorityToWeight := map[Priority]int{
var (
benchmarkPriorityToWeight = map[Priority]int{
0: 5,
1: 3,
2: 2,
3: 1,
}
)

func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) {
logger := log.NewTestLogger()

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
PriorityToWeight: priorityToWeight,
PriorityToWeight: benchmarkPriorityToWeight,
},
&noopProcessor{},
metrics.NoopMetricsHandler,
Expand All @@ -77,6 +80,34 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler(b *testing.B) {
waitGroup.Wait()
}

func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) {
logger := log.NewTestLogger()

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
PriorityToWeight: benchmarkPriorityToWeight,
},
&noopProcessor{},
metrics.NoopMetricsHandler,
logger,
)
scheduler.Start()
defer scheduler.Stop()

waitGroup := &sync.WaitGroup{}
waitGroup.Add(b.N)

b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
scheduler.Submit(&noopTask{WaitGroup: waitGroup})
}
})
waitGroup.Wait()
}

func (n *noopProcessor) Start() {}
func (n *noopProcessor) Stop() {}
func (n *noopProcessor) Submit(task Task) { task.Ack() }
Expand Down
77 changes: 47 additions & 30 deletions common/tasks/interleaved_weighted_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type (
notifyChan chan struct{}
shutdownChan chan struct{}

numInflightTask int64
sync.RWMutex
priorityToWeight map[Priority]int
weightToTaskChannels map[int]*WeightedChannel
Expand All @@ -67,7 +68,7 @@ type (
// 3: 1,
// }
// then iwrrChannels will contain chan [0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 3] (ID-ed by priority)
iwrrChannels []*WeightedChannel
iwrrChannels atomic.Value // []*WeightedChannel
}
)

Expand All @@ -77,6 +78,8 @@ func NewInterleavedWeightedRoundRobinScheduler(
metricsProvider metrics.MetricsHandler,
logger log.Logger,
) *InterleavedWeightedRoundRobinScheduler {
iwrrChannels := atomic.Value{}
iwrrChannels.Store([]*WeightedChannel{})
return &InterleavedWeightedRoundRobinScheduler{
status: common.DaemonStatusInitialized,

Expand All @@ -87,9 +90,10 @@ func NewInterleavedWeightedRoundRobinScheduler(
notifyChan: make(chan struct{}, 1),
shutdownChan: make(chan struct{}),

numInflightTask: 0,
priorityToWeight: option.PriorityToWeight,
weightToTaskChannels: make(map[int]*WeightedChannel),
iwrrChannels: []*WeightedChannel{},
iwrrChannels: iwrrChannels,
}
}

Expand Down Expand Up @@ -130,6 +134,13 @@ func (s *InterleavedWeightedRoundRobinScheduler) Stop() {
func (s *InterleavedWeightedRoundRobinScheduler) Submit(
task PriorityTask,
) {
numTasks := atomic.AddInt64(&s.numInflightTask, 1)
if numTasks == 1 {
s.doDispatchTasksDirectly(task)
return
}

// there are tasks pending dispatching, need to respect task priorities
channel := s.getOrCreateTaskChannel(s.priorityToWeight[task.GetPriority()])
channel.Chan() <- task
s.notifyDispatcher()
Expand All @@ -138,12 +149,20 @@ func (s *InterleavedWeightedRoundRobinScheduler) Submit(
func (s *InterleavedWeightedRoundRobinScheduler) TrySubmit(
task PriorityTask,
) bool {
numTasks := atomic.AddInt64(&s.numInflightTask, 1)
if numTasks == 1 {
s.doDispatchTasksDirectly(task)
return true
}

// there are tasks pending dispatching, need to respect task priorities
channel := s.getOrCreateTaskChannel(s.priorityToWeight[task.GetPriority()])
select {
case channel.Chan() <- task:
s.notifyDispatcher()
return true
default:
atomic.AddInt64(&s.numInflightTask, -1)
return false
}
}
Expand All @@ -152,7 +171,7 @@ func (s *InterleavedWeightedRoundRobinScheduler) eventLoop() {
for {
select {
case <-s.notifyChan:
s.dispatchTasks()
s.dispatchTasksWithWeight()
case <-s.shutdownChan:
return
}
Expand Down Expand Up @@ -194,23 +213,13 @@ func (s *InterleavedWeightedRoundRobinScheduler) getOrCreateTaskChannel(
iwrrChannels = append(iwrrChannels, weightedChannels[index])
}
}
s.iwrrChannels = iwrrChannels
s.iwrrChannels.Store(iwrrChannels)

return channel
}

func (s *InterleavedWeightedRoundRobinScheduler) dispatchTasks() {
for s.hasRemainingTasks() {
weightedChannels := s.channels()
s.doDispatchTasks(weightedChannels)
}
}

func (s *InterleavedWeightedRoundRobinScheduler) channels() []*WeightedChannel {
s.RLock()
defer s.RUnlock()

return s.iwrrChannels
return s.iwrrChannels.Load().([]*WeightedChannel)
}

func (s *InterleavedWeightedRoundRobinScheduler) notifyDispatcher() {
Expand All @@ -225,52 +234,60 @@ func (s *InterleavedWeightedRoundRobinScheduler) notifyDispatcher() {
}
}

func (s *InterleavedWeightedRoundRobinScheduler) doDispatchTasks(
func (s *InterleavedWeightedRoundRobinScheduler) dispatchTasksWithWeight() {
for s.hasRemainingTasks() {
weightedChannels := s.channels()
s.doDispatchTasksWithWeight(weightedChannels)
}
}

func (s *InterleavedWeightedRoundRobinScheduler) doDispatchTasksWithWeight(
channels []*WeightedChannel,
) {
numTasks := int64(0)
LoopDispatch:
for _, channel := range channels {
select {
case task := <-channel.Chan():
s.processor.Submit(task)

case <-s.shutdownChan:
return

numTasks++
default:
continue LoopDispatch
}
}
atomic.AddInt64(&s.numInflightTask, -numTasks)
}

func (s *InterleavedWeightedRoundRobinScheduler) hasRemainingTasks() bool {
s.RLock()
defer s.RUnlock()
func (s *InterleavedWeightedRoundRobinScheduler) doDispatchTasksDirectly(
task PriorityTask,
) {
s.processor.Submit(task)
atomic.AddInt64(&s.numInflightTask, -1)
}

for _, weightedChan := range s.weightToTaskChannels {
if weightedChan.Len() > 0 {
return true
}
}
return false
func (s *InterleavedWeightedRoundRobinScheduler) hasRemainingTasks() bool {
numTasks := atomic.LoadInt64(&s.numInflightTask)
return numTasks > 0
}

func (s *InterleavedWeightedRoundRobinScheduler) rescheduleTasks() {
s.RLock()
defer s.RUnlock()

numTasks := int64(0)
DrainLoop:
for _, channel := range s.weightToTaskChannels {
for {
select {
case task := <-channel.Chan():
task.Reschedule()

numTasks++
default:
continue DrainLoop
}
}
}
atomic.AddInt64(&s.numInflightTask, -numTasks)
}

func (s *InterleavedWeightedRoundRobinScheduler) isStopped() bool {
Expand Down
19 changes: 19 additions & 0 deletions common/tasks/interleaved_weighted_round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package tasks
import (
"math/rand"
"sync"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestSubmitSchedule_Success
s.scheduler.Submit(mockTask)

testWaitGroup.Wait()
s.Equal(int64(0), atomic.LoadInt64(&s.scheduler.numInflightTask))
}

func (s *interleavedWeightedRoundRobinSchedulerSuite) TestSubmitSchedule_Fail() {
Expand All @@ -131,15 +133,27 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestSubmitSchedule_Fail()
s.scheduler.Submit(mockTask)

testWaitGroup.Wait()
s.Equal(int64(0), atomic.LoadInt64(&s.scheduler.numInflightTask))
}

func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() {
// need to manually set the number of pending task to 1
// so schedule by task priority logic will execute
numTasks := atomic.AddInt64(&s.scheduler.numInflightTask, 1)
s.Equal(int64(1), numTasks)
numPendingTasks := 0
defer func() {
numTasks := atomic.AddInt64(&s.scheduler.numInflightTask, -1)
s.Equal(int64(numPendingTasks), numTasks)
}()

var channelWeights []int

channelWeights = nil
mockTask0 := NewMockPriorityTask(s.controller)
mockTask0.EXPECT().GetPriority().Return(Priority(0)).AnyTimes()
s.scheduler.Submit(mockTask0)
numPendingTasks++
for _, channel := range s.scheduler.channels() {
channelWeights = append(channelWeights, channel.Weight())
}
Expand All @@ -149,6 +163,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() {
mockTask1 := NewMockPriorityTask(s.controller)
mockTask1.EXPECT().GetPriority().Return(Priority(1)).AnyTimes()
s.scheduler.Submit(mockTask1)
numPendingTasks++
for _, channel := range s.scheduler.channels() {
channelWeights = append(channelWeights, channel.Weight())
}
Expand All @@ -158,6 +173,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() {
mockTask2 := NewMockPriorityTask(s.controller)
mockTask2.EXPECT().GetPriority().Return(Priority(2)).AnyTimes()
s.scheduler.Submit(mockTask2)
numPendingTasks++
for _, channel := range s.scheduler.channels() {
channelWeights = append(channelWeights, channel.Weight())
}
Expand All @@ -167,6 +183,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() {
mockTask3 := NewMockPriorityTask(s.controller)
mockTask3.EXPECT().GetPriority().Return(Priority(3)).AnyTimes()
s.scheduler.Submit(mockTask3)
numPendingTasks++
for _, channel := range s.scheduler.channels() {
channelWeights = append(channelWeights, channel.Weight())
}
Expand All @@ -177,6 +194,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() {
s.scheduler.Submit(mockTask1)
s.scheduler.Submit(mockTask2)
s.scheduler.Submit(mockTask3)
numPendingTasks += 4
for _, channel := range s.scheduler.channels() {
channelWeights = append(channelWeights, channel.Weight())
}
Expand Down Expand Up @@ -226,4 +244,5 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestParallelSubmitSchedule
endWaitGroup.Wait()

testWaitGroup.Wait()
s.Equal(int64(0), atomic.LoadInt64(&s.scheduler.numInflightTask))
}

0 comments on commit 0d4e39b

Please sign in to comment.