Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff when no successful schedules #2102

Merged
merged 7 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/field"
Expand All @@ -51,6 +50,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/resource"
"sigs.k8s.io/kueue/pkg/util/routine"
"sigs.k8s.io/kueue/pkg/util/wait"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -125,7 +125,7 @@ func New(queues *queue.Manager, cache *cache.Cache, cl client.Client, recorder r
func (s *Scheduler) Start(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx).WithName("scheduler")
ctx = ctrl.LoggerInto(ctx, log)
go wait.UntilWithContext(ctx, s.schedule, 0)
go wait.UntilWithBackoff(ctx, s.schedule)
return nil
}

Expand Down Expand Up @@ -180,15 +180,15 @@ func (cu *cohortsUsage) hasCommonFlavorResources(cohort string, assignment cache
return false
}

func (s *Scheduler) schedule(ctx context.Context) {
func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
gabesaba marked this conversation as resolved.
Show resolved Hide resolved
log := ctrl.LoggerFrom(ctx)

// 1. Get the heads from the queues, including their desired clusterQueue.
// This operation blocks while the queues are empty.
headWorkloads := s.queues.Heads(ctx)
// If there are no elements, it means that the program is finishing.
if len(headWorkloads) == 0 {
return
return wait.KeepGoing
}
startTime := time.Now()

Expand Down Expand Up @@ -295,6 +295,10 @@ func (s *Scheduler) schedule(ctx context.Context) {
}
}
metrics.AdmissionAttempt(result, time.Since(startTime))
if result != metrics.AdmissionResultSuccess {
return wait.SlowDown
}
return wait.KeepGoing
}

type entryStatus string
Expand Down
87 changes: 87 additions & 0 deletions pkg/util/wait/backoff.go
gabesaba marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wait

import (
"context"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
)

// UntilWithBackoff runs f in a loop until context indicates finished. It
// applies backoff depending on the SpeedSignal f returns. Backoff increases
// exponentially, ranging from 1ms to 100ms.
func UntilWithBackoff(ctx context.Context, f func(context.Context) SpeedSignal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func UntilWithBackoff(ctx context.Context, f func(context.Context) SpeedSignal) {
func UntilWithBackoff(ctx context.Context, f func(context.Context) bool) {

The function can simply return whether to backoff or not. Or from the scheduler's perspective: whether it successfully scheduled anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a bool better than a specific type? I defined this type so that the API would be self-explanatory (and harder to mistakenly flip the return type)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use bools when there is no ambiguity of what the return value is. But in this case, it is a bit ambiguos.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabesaba Do you have more advantages rather than using bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For type safety: to make UntilWithBackoff harder to use incorrectly. When users read code, it will be clear that this return value means to backoff or not, rather than having to look up in library definition of UntilWithBackoff when true/false should be returned.

Especially since there's some indirection in the usage: Scheduler.schedule implements the API, and this function is provided to UntilWithBackoff by Scheduler.Start

// create and drain timer, allowing reuse of same timer via timer.Reset
timer := clock.RealClock{}.NewTimer(0)
<-timer.C()
untilWithBackoff(ctx, f, timer)
}

func untilWithBackoff(ctx context.Context, f func(context.Context) SpeedSignal, timer clock.Timer) {
mgr := speedyBackoffManager{
backoff: noBackoff,
timer: timer,
}
wait.BackoffUntil(func() {
mgr.toggleBackoff(f(ctx))
}, &mgr, false, ctx.Done())
}

// SpeedSignal indicates whether we should run the function again immediately,
// or apply backoff.
type SpeedSignal bool

const (
// KeepGoing signals to continue immediately.
KeepGoing SpeedSignal = true
// SlowDown signals to backoff.
SlowDown SpeedSignal = false

noBackoff = time.Millisecond * 0
initialBackoff = time.Millisecond * 1
maxBackoff = time.Millisecond * 100
)

func (s *speedyBackoffManager) toggleBackoff(speedSignal SpeedSignal) {
switch speedSignal {
case KeepGoing:
s.backoff = noBackoff
case SlowDown:
if s.backoff == noBackoff {
s.backoff = initialBackoff
}
}
}

gabesaba marked this conversation as resolved.
Show resolved Hide resolved
type speedyBackoffManager struct {
backoff time.Duration
timer clock.Timer
}

var _ wait.BackoffManager = (*speedyBackoffManager)(nil)

func (s *speedyBackoffManager) Backoff() clock.Timer {
s.timer.Reset(s.backoff)
s.backoff *= 2
if s.backoff > maxBackoff {
s.backoff = maxBackoff
}
return s.timer
}
110 changes: 110 additions & 0 deletions pkg/util/wait/backoff_test.go
gabesaba marked this conversation as resolved.
Show resolved Hide resolved
gabesaba marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wait

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
)

type SpyTimer struct {
history *[]time.Duration
clock.Timer
}

func (s SpyTimer) Reset(d time.Duration) bool {
*s.history = append(*s.history, d)
return s.Timer.Reset(0)
}

func makeSpyTimer() SpyTimer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, I didn't see this used before. Usually we use fakeClocks. I guess one advantage of a fake clock is that you don't need to wait at all, but given these times are short I think it is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we can use the fake clock instead of this mock function, I would prefer to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I call Timer.Reset(0) on the real timer, so there is no waiting. I am open to using a fake clock, but I suppose I will still need to implement some functionality to capture the history (as I did with SpyTimer)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I call Timer.Reset(0) on the real timer, so there is no waiting.

Cool.

I will still need to implement some functionality to capture the history (as I did with SpyTimer)?

Mostly in the tests I saw we are just using sleep to pass the arbitrary amount of time, like here.

Maybe fakeClock is more appropriate to test at the Scheduler level, where the internal details are more encapsulated. By analogy to the Job controller we could keep the clock at the Scheduler level, and use real for prod, but fake for testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not quite the same purpose as the fake clock.

But why do we even keep a timer inside the SpyTimer? It seems rather unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do we even keep a timer inside the SpyTimer? It seems rather unused.

To avoid implementing the rest of the clock.Timer interface

timer := clock.RealClock{}.NewTimer(0)
<-timer.C()
return SpyTimer{history: ptr.To([]time.Duration{}), Timer: timer}
}

func ms(m time.Duration) time.Duration {
return time.Millisecond * m
}

func TestUntilWithBackoff(t *testing.T) {
type TestCase struct {
name string
signals []SpeedSignal
expected []time.Duration
}

testCases := []TestCase{
{
name: "base case",
signals: []SpeedSignal{},
expected: []time.Duration{ms(0)},
},
{
name: "base SlowDown",
signals: []SpeedSignal{SlowDown},
expected: []time.Duration{ms(0), ms(1)},
},
{
name: "base KeepGoing",
signals: []SpeedSignal{KeepGoing},
expected: []time.Duration{ms(0), ms(0)},
},
{
name: "KeepGoing always returns 0",
signals: []SpeedSignal{KeepGoing, KeepGoing, KeepGoing, KeepGoing},
expected: []time.Duration{ms(0), ms(0), ms(0), ms(0), ms(0)},
},
{
name: "reset before reaching max backoff",
signals: []SpeedSignal{SlowDown, SlowDown, SlowDown, KeepGoing, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, KeepGoing},
expected: []time.Duration{ms(0), ms(1), ms(2), ms(4), ms(0), ms(1), ms(2), ms(4), ms(8), ms(16), ms(32), ms(64), ms(0)},
},
{
name: "double until max then reset",
signals: []SpeedSignal{SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, SlowDown, KeepGoing},
expected: []time.Duration{ms(0), ms(1), ms(2), ms(4), ms(8), ms(16), ms(32), ms(64), ms(100), ms(100), ms(0)},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
timer := makeSpyTimer()
ctx, cancel := context.WithCancel(context.Background())

i := 0
f := func(ctx context.Context) SpeedSignal {
if i >= len(testCase.signals) {
cancel()
return KeepGoing
}
signal := testCase.signals[i]
i++
return signal
}
untilWithBackoff(ctx, f, timer)

if diff := cmp.Diff(testCase.expected, *timer.history); diff != "" {
t.Errorf("Unexpected backoff time (-want,+got):\n%s", diff)
}
})
}
}