From 73b9592ad1fb34e54e60b8b0f158c99a58b2ebe3 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Thu, 29 Aug 2019 16:19:45 -0700 Subject: [PATCH 1/2] Fix retry logic in DisruptionController This changes the retry logic in DisruptionController so that it reconciles update conflicts. In the old behavior, any pdb status update failure was retried with the same status, regardless of error. Now there is no retry logic with the status update. The error is passed up the stack where the PDB can be requeued for processing. If the PDB status update error is a conflict error, there are some new special cases: - failSafe is not triggered, since this is considered a retryable error - the PDB is requeued immediately (ignoring the rate limiter) because we assume that conflict can be resolved by getting the latest version --- pkg/controller/disruption/BUILD | 1 - pkg/controller/disruption/disruption.go | 37 +++++++------------------ 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index 016584793218..b7f0600881c7 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -32,7 +32,6 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 8be258d10c7a..7e72dd1b03a4 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -39,7 +39,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - policyclientset "k8s.io/client-go/kubernetes/typed/policy/v1beta1" appsv1listers "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" @@ -53,8 +52,6 @@ import ( "k8s.io/klog" ) -const statusUpdateRetries = 2 - // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status // to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. // If the pod was not marked for deletion during that time it is assumed that it won't be deleted at @@ -544,7 +541,13 @@ func (dc *DisruptionController) sync(key string) error { return err } - if err := dc.trySync(pdb); err != nil { + err = dc.trySync(pdb) + // If the reason for failure was a conflict, then allow this PDB update to be + // requeued without triggering the failSafe logic. + if errors.IsConflict(err) { + return err + } + if err != nil { klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) return dc.failSafe(pdb) } @@ -785,29 +788,9 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, return dc.getUpdater()(newPdb) } -// refresh tries to re-GET the given PDB. If there are any errors, it just -// returns the old PDB. Intended to be used in a retry loop where it runs a -// bounded number of times. -func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget { - newPdb, err := pdbClient.Get(pdb.Name, metav1.GetOptions{}) - if err == nil { - return newPdb - } - return pdb - -} - func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { - pdbClient := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace) - st := pdb.Status - - var err error - for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) { - pdb.Status = st - if _, err = pdbClient.UpdateStatus(pdb); err == nil { - break - } - } - + // If this update fails, don't retry it. Allow the failure to get handled & + // retried in `processNextWorkItem()`. + _, err := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(pdb) return err } From b8e93184fe9bdb7db15121555431186b3859f1c9 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Fri, 30 Aug 2019 13:45:30 -0700 Subject: [PATCH 2/2] Add unit test for DisruptionController retry logic This tests the PDB status update path in DisruptionController and asserts that conflicting writes (with eviciton handler) are handled gracefully. This adds the client-go fake.Clientset into our tests, because that is the layer required for injecting update failures. This also adds a TestMain so that DisruptionController logs can be enabled during test. e.g., go test ./pkg/controller/disruption -v -args -v=4 --- pkg/controller/disruption/BUILD | 4 + pkg/controller/disruption/disruption_test.go | 161 ++++++++++++++++++- 2 files changed, 162 insertions(+), 3 deletions(-) diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index b7f0600881c7..2d1560f189e9 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -55,6 +55,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -62,11 +63,14 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/scale/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index ea3d4570c4a3..fe3769b5062f 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -17,16 +17,21 @@ limitations under the License. package disruption import ( + "context" + "flag" "fmt" + "os" "runtime/debug" + "sync" "testing" "time" apps "k8s.io/api/apps/v1" autoscalingapi "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta/testrestmapper" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -34,11 +39,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" scalefake "k8s.io/client-go/scale/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" utilpointer "k8s.io/utils/pointer" @@ -97,6 +105,7 @@ type disruptionController struct { dStore cache.Store ssStore cache.Store + coreClient *fake.Clientset scaleClient *scalefake.FakeScaleClient } @@ -109,7 +118,8 @@ var customGVK = schema.GroupVersionKind{ func newFakeDisruptionController() (*disruptionController, *pdbStates) { ps := &pdbStates{} - informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + coreClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(coreClient, controller.NoResyncPeriodFunc()) scheme := runtime.NewScheme() scheme.AddKnownTypeWithName(customGVK, &v1.Service{}) @@ -122,7 +132,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().ReplicaSets(), informerFactory.Apps().V1().Deployments(), informerFactory.Apps().V1().StatefulSets(), - nil, + coreClient, testrestmapper.TestOnlyStaticRESTMapper(scheme), fakeScaleClient, ) @@ -134,6 +144,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { dc.dListerSynced = alwaysReady dc.ssListerSynced = alwaysReady + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(nil) + return &disruptionController{ dc, informerFactory.Core().V1().Pods().Informer().GetStore(), @@ -142,6 +155,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(), informerFactory.Apps().V1().Deployments().Informer().GetStore(), informerFactory.Apps().V1().StatefulSets().Informer().GetStore(), + coreClient, fakeScaleClient, }, ps } @@ -1025,3 +1039,144 @@ func TestDeploymentFinderFunction(t *testing.T) { }) } } + +// This test checks that the disruption controller does not write stale data to +// a PDB status during race conditions with the eviction handler. Specifically, +// failed updates due to ResourceVersion conflict should not cause a stale value +// of PodDisruptionsAllowed to be written. +// +// In this test, PodDisruptionsAllowed starts at 2. +// (A) We will delete 1 pod and trigger DisruptionController to set +// PodDisruptionsAllowed to 1. +// (B) As the DisruptionController attempts this write, we will evict the +// remaining 2 pods and update PodDisruptionsAllowed to 0. (The real eviction +// handler would allow this because it still sees PodDisruptionsAllowed=2.) +// (C) If the DisruptionController writes PodDisruptionsAllowed=1 despite the +// resource conflict error, then there is a bug. +func TestUpdatePDBStatusRetries(t *testing.T) { + dc, _ := newFakeDisruptionController() + // Inject the production code over our fake impl + dc.getUpdater = func() updater { return dc.writePdbStatus } + + // Create a PDB and 3 pods that match it. + pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) + pdb, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).Create(pdb) + if err != nil { + t.Fatalf("Failed to create PDB: %v", err) + } + podNames := []string{"moe", "larry", "curly"} + for _, name := range podNames { + pod, _ := newPod(t, name) + _, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(pod) + if err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + } + + // Block until the fake clientset writes are observable in the informer caches. + // FUN FACT: This guarantees that the informer caches have updated, but it does + // not guarantee that informer event handlers have completed. Fortunately, + // DisruptionController does most of its logic by reading from informer + // listers, so this guarantee is sufficient. + if err := waitForCacheCount(dc.pdbStore, 1); err != nil { + t.Fatalf("Failed to verify PDB in informer cache: %v", err) + } + if err := waitForCacheCount(dc.podStore, len(podNames)); err != nil { + t.Fatalf("Failed to verify pods in informer cache: %v", err) + } + + // Sync DisruptionController once to update PDB status. + if err := dc.sync(pdbKey); err != nil { + t.Fatalf("Failed initial sync: %v", err) + } + + // Evict simulates the visible effects of eviction in our fake client. + evict := func(podNames ...string) { + // These GVRs are copied from the generated fake code because they are not exported. + var ( + podsResource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + poddisruptionbudgetsResource = schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"} + ) + + // Bypass the coreClient.Fake and write directly to the ObjectTracker, because + // this helper will be called while the Fake is holding a lock. + obj, err := dc.coreClient.Tracker().Get(poddisruptionbudgetsResource, pdb.Namespace, pdb.Name) + if err != nil { + t.Fatalf("Failed to get PDB: %v", err) + } + updatedPDB := obj.(*policy.PodDisruptionBudget) + // Each eviction, + // - decrements PodDisruptionsAllowed + // - adds the pod to DisruptedPods + // - deletes the pod + updatedPDB.Status.PodDisruptionsAllowed -= int32(len(podNames)) + updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time) + for _, name := range podNames { + updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now()) + } + if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil { + t.Fatalf("Eviction (PDB update) failed: %v", err) + } + for _, name := range podNames { + if err := dc.coreClient.Tracker().Delete(podsResource, "default", name); err != nil { + t.Fatalf("Eviction (pod delete) failed: %v", err) + } + } + } + + // The fake kube client does not update ResourceVersion or check for conflicts. + // Instead, we add a reactor that returns a conflict error on the first PDB + // update and success after that. + var failOnce sync.Once + dc.coreClient.Fake.PrependReactor("update", "poddisruptionbudgets", func(a core.Action) (handled bool, obj runtime.Object, err error) { + failOnce.Do(func() { + // (B) Evict two pods and fail this update. + evict(podNames[1], podNames[2]) + handled = true + err = errors.NewConflict(a.GetResource().GroupResource(), pdb.Name, fmt.Errorf("conflict")) + }) + return handled, obj, err + }) + + // (A) Delete one pod + if err := dc.coreClient.CoreV1().Pods("default").Delete(podNames[0], &metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil { + t.Fatalf("Failed to verify pods in informer cache: %v", err) + } + + // The sync() function should either write a correct status which takes the + // evictions into account, or re-queue the PDB for another sync (by returning + // an error) + if err := dc.sync(pdbKey); err != nil { + t.Logf("sync() returned with error: %v", err) + } else { + t.Logf("sync() returned with no error") + } + + // (C) Whether or not sync() returned an error, the PDB status should reflect + // the evictions that took place. + finalPDB, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets("default").Get(pdb.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get PDB: %v", err) + } + if expected, actual := int32(0), finalPDB.Status.PodDisruptionsAllowed; expected != actual { + t.Errorf("PodDisruptionsAllowed should be %d, got %d", expected, actual) + } +} + +// waitForCacheCount blocks until the given cache store has the desired number +// of items in it. This will return an error if the condition is not met after a +// 10 second timeout. +func waitForCacheCount(store cache.Store, n int) error { + return wait.Poll(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return len(store.List()) == n, nil + }) +} + +// TestMain adds klog flags to make debugging tests easier. +func TestMain(m *testing.M) { + klog.InitFlags(flag.CommandLine) + os.Exit(m.Run()) +}