Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry when it fails to update pods status on scheduling loop #109832

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/scheduler/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
Expand Down Expand Up @@ -115,8 +117,12 @@ func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, n
return nil
}

_, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
patchFn := func() error {
_, err := cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}

return retry.OnError(retry.DefaultBackoff, net.IsConnectionRefused, patchFn)
}

// DeletePod deletes the given <pod> from API server
Expand Down
109 changes: 102 additions & 7 deletions pkg/scheduler/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package util

import (
"context"
"errors"
"fmt"
"syscall"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
Expand Down Expand Up @@ -181,12 +184,17 @@ func TestRemoveNominatedNodeName(t *testing.T) {

func TestPatchPodStatus(t *testing.T) {
tests := []struct {
name string
pod v1.Pod
name string
pod v1.Pod
client *clientsetfake.Clientset
// validateErr checks if error returned from PatchPodStatus is expected one or not.
// (true means error is expected one.)
validateErr func(goterr error) bool
statusToUpdate v1.PodStatus
}{
{
name: "Should update pod conditions successfully",
name: "Should update pod conditions successfully",
client: clientsetfake.NewSimpleClientset(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Expand All @@ -209,11 +217,12 @@ func TestPatchPodStatus(t *testing.T) {
// ref: #101697, #94626 - ImagePullSecrets are allowed to have empty secret names
// which would fail the 2-way merge patch generation on Pod patches
// due to the mergeKey being the name field
name: "Should update pod conditions successfully on a pod Spec with secrets with empty name",
name: "Should update pod conditions successfully on a pod Spec with secrets with empty name",
client: clientsetfake.NewSimpleClientset(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "pod2",
Name: "pod1",
},
Spec: v1.PodSpec{
// this will serialize to imagePullSecrets:[{}]
Expand All @@ -229,11 +238,90 @@ func TestPatchPodStatus(t *testing.T) {
},
},
},
{
name: "retry patch request when an 'connection refused' error is returned",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount == 0 {
// return an connection refused error for the first patch request.
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
}
if reqcount == 1 {
// not return error for the second patch request.
return false, &v1.Pod{}, nil
}

// return error if requests comes in more than three times.
return true, nil, errors.New("requests comes in more than three times.")
})

return client
}(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
Namespace: "ns",
Name: "pod1",
},
Spec: v1.PodSpec{
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
},
},
statusToUpdate: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
{
name: "only 4 retries at most",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount >= 4 {
// return error if requests comes in more than four times.
return true, nil, errors.New("requests comes in more than four times.")
}

// return an connection refused error for the first patch request.
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
})

return client
}(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "pod1",
},
Spec: v1.PodSpec{
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
},
},
validateErr: net.IsConnectionRefused,
statusToUpdate: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
}

client := clientsetfake.NewSimpleClientset()
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
client := tc.client
_, err := client.CoreV1().Pods(tc.pod.Namespace).Create(context.TODO(), &tc.pod, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
Expand All @@ -242,9 +330,16 @@ func TestPatchPodStatus(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate)
if err != nil {
if err != nil && tc.validateErr == nil {
// shouldn't be error
t.Fatal(err)
}
if tc.validateErr != nil {
if !tc.validateErr(err) {
t.Fatalf("Returned unexpected error: %v", err)
}
return
}

retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
if err != nil {
Expand Down