Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #87728: Fix back off when scheduling cycle is delayed #87817

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scheduler/internal/queue/BUILD
Expand Up @@ -40,6 +40,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
],
)
Expand Down
17 changes: 12 additions & 5 deletions pkg/scheduler/internal/queue/pod_backoff.go
Expand Up @@ -21,12 +21,14 @@ import (
"time"

ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/util"
)

// PodBackoffMap is a structure that stores backoff related information for pods
type PodBackoffMap struct {
// lock for performing actions on this PodBackoffMap
lock sync.RWMutex
lock sync.RWMutex
clock util.Clock
// initial backoff duration
initialDuration time.Duration
// maximal backoff duration
Expand All @@ -38,8 +40,9 @@ type PodBackoffMap struct {
}

// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration.
func NewPodBackoffMap(initialDuration, maxDuration time.Duration) *PodBackoffMap {
func NewPodBackoffMap(initialDuration, maxDuration time.Duration, clock util.Clock) *PodBackoffMap {
return &PodBackoffMap{
clock: clock,
initialDuration: initialDuration,
maxDuration: maxDuration,
podAttempts: make(map[ktypes.NamespacedName]int),
Expand Down Expand Up @@ -91,12 +94,16 @@ func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) {

// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff,
// i.e, it will remove a pod from the PodBackoffMap if
// lastUpdateTime + maxBackoffDuration is before the current timestamp
// lastUpdateTime + maxDuration >> timestamp
// We should wait longer than the maxDuration so that the pod gets a chance to
// (1) move to the active queue and (2) get an schedule attempt.
func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
pbm.lock.Lock()
defer pbm.lock.Unlock()
for pod, value := range pbm.podLastUpdateTime {
if value.Add(pbm.maxDuration).Before(time.Now()) {
// Here we assume that maxDuration should be enough for a pod to move up the
// active queue and get an schedule attempt.
if value.Add(2 * pbm.maxDuration).Before(pbm.clock.Now()) {
pbm.clearPodBackoff(pod)
}
}
Expand All @@ -106,7 +113,7 @@ func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
// and increases its numberOfAttempts by 1
func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) {
pbm.lock.Lock()
pbm.podLastUpdateTime[nsPod] = time.Now()
pbm.podLastUpdateTime[nsPod] = pbm.clock.Now()
pbm.podAttempts[nsPod]++
pbm.lock.Unlock()
}
29 changes: 17 additions & 12 deletions pkg/scheduler/internal/queue/pod_backoff_test.go
Expand Up @@ -17,15 +17,17 @@ limitations under the License.
package queue

import (
"fmt"
"testing"
"time"

ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
)

func TestBackoffPod(t *testing.T) {
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second)

timestamp := time.Now()
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second, clock.NewFakeClock(timestamp))
tests := []struct {
podID ktypes.NamespacedName
expectedDuration time.Duration
Expand Down Expand Up @@ -61,20 +63,23 @@ func TestBackoffPod(t *testing.T) {
},
}

for _, test := range tests {
// Backoff the pod
bpm.BackoffPod(test.podID)
// Get backoff duration for the pod
duration := bpm.calculateBackoffDuration(test.podID)

if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
}
for i, test := range tests {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
bpm.BackoffPod(test.podID)
backoff, ok := bpm.GetBackoffTime(test.podID)
if !ok {
t.Errorf("%v should be backed off", test.podID)
}
duration := backoff.Sub(timestamp)
if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
}
})
}
}

func TestClearPodBackoff(t *testing.T) {
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second, clock.NewFakeClock(time.Now()))
// Clear backoff on an not existed pod
bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"})
// Backoff twice for pod foo
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -181,7 +181,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk frame
pq := &PriorityQueue{
clock: clock,
stop: stop,
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second, clock),
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
nominatedPods: newNominatedPodMap(),
Expand Down
84 changes: 84 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
Expand Down Expand Up @@ -1329,3 +1330,86 @@ func TestPendingPodsMetric(t *testing.T) {
})
}
}

func TestBackOffFlow(t *testing.T) {
cl := clock.NewFakeClock(time.Now())
q := NewPriorityQueueWithClock(nil, cl, nil)
steps := []struct {
wantBackoff time.Duration
}{
{wantBackoff: time.Second},
{wantBackoff: 2 * time.Second},
{wantBackoff: 4 * time.Second},
{wantBackoff: 8 * time.Second},
{wantBackoff: 10 * time.Second},
{wantBackoff: 10 * time.Second},
{wantBackoff: 10 * time.Second},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-ns",
UID: "test-uid",
},
}
podID := nsNameForPod(pod)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
t.Fatal(err)
}
if err := q.Add(pod); err != nil {
t.Fatal(err)
}

for i, step := range steps {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
timestamp := cl.Now()
// Simulate schedule attempt.
pod, err := q.Pop()
if err != nil {
t.Fatal(err)
}
if err := q.AddUnschedulableIfNotPresent(pod, int64(i)); err != nil {
t.Fatal(err)
}

// An event happens.
q.MoveAllToActiveQueue()

if _, ok, _ := q.podBackoffQ.GetByKey(podKey); !ok {
t.Errorf("pod %v is not in the backoff queue", podID)
}

// Check backoff duration.
deadline, ok := q.podBackoff.GetBackoffTime(podID)
if !ok {
t.Errorf("didn't get backoff for pod %s", podID)
}
backoff := deadline.Sub(timestamp)
if backoff != step.wantBackoff {
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
}

// Simulate routine that continuously flushes the backoff queue.
cl.Step(time.Millisecond)
q.flushBackoffQCompleted()
// Still in backoff queue after an early flush.
if _, ok, _ := q.podBackoffQ.GetByKey(podKey); !ok {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// Moved out of the backoff queue after timeout.
cl.Step(backoff)
q.flushBackoffQCompleted()
if _, ok, _ := q.podBackoffQ.GetByKey(podKey); ok {
t.Errorf("pod %v is still in the backoff queue", podID)
}
})
}
// After some time, backoff information is cleared.
cl.Step(time.Hour)
q.podBackoff.CleanupPodsCompletesBackingoff()
_, ok := q.podBackoff.GetBackoffTime(podID)
if ok {
t.Errorf("backoff information for pod %s was not cleared", podID)
}
}