Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry logic in DisruptionController #82152

Merged
merged 2 commits into from Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/controller/disruption/BUILD
Expand Up @@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
Expand All @@ -56,18 +55,22 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
Expand Down
39 changes: 11 additions & 28 deletions pkg/controller/disruption/disruption.go
Expand Up @@ -21,7 +21,7 @@ import (
"time"

apps "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -39,7 +39,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
policyclientset "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
Expand All @@ -53,8 +52,6 @@ import (
"k8s.io/klog"
)

const statusUpdateRetries = 2

// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
Expand Down Expand Up @@ -532,7 +529,13 @@ func (dc *DisruptionController) sync(key string) error {
return err
}

if err := dc.trySync(pdb); err != nil {
err = dc.trySync(pdb)
// If the reason for failure was a conflict, then allow this PDB update to be
// requeued without triggering the failSafe logic.
if errors.IsConflict(err) {
return err
}
if err != nil {
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb)
}
Expand Down Expand Up @@ -773,29 +776,9 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget,
return dc.getUpdater()(newPdb)
}

// refresh tries to re-GET the given PDB. If there are any errors, it just
// returns the old PDB. Intended to be used in a retry loop where it runs a
// bounded number of times.
func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget {
newPdb, err := pdbClient.Get(pdb.Name, metav1.GetOptions{})
if err == nil {
return newPdb
}
return pdb

}

func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
pdbClient := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace)
st := pdb.Status

var err error
for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) {
pdb.Status = st
if _, err = pdbClient.UpdateStatus(pdb); err == nil {
break
}
}

// If this update fails, don't retry it. Allow the failure to get handled &
// retried in `processNextWorkItem()`.
_, err := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(pdb)
return err
}
161 changes: 158 additions & 3 deletions pkg/controller/disruption/disruption_test.go
Expand Up @@ -17,28 +17,36 @@ limitations under the License.
package disruption

import (
"context"
"flag"
"fmt"
"os"
"runtime/debug"
"sync"
"testing"
"time"

apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
scalefake "k8s.io/client-go/scale/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
utilpointer "k8s.io/utils/pointer"
Expand Down Expand Up @@ -97,6 +105,7 @@ type disruptionController struct {
dStore cache.Store
ssStore cache.Store

coreClient *fake.Clientset
scaleClient *scalefake.FakeScaleClient
}

Expand All @@ -109,7 +118,8 @@ var customGVK = schema.GroupVersionKind{
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
ps := &pdbStates{}

informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc())
coreClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(coreClient, controller.NoResyncPeriodFunc())

scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
Expand All @@ -122,7 +132,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().Deployments(),
informerFactory.Apps().V1().StatefulSets(),
nil,
coreClient,
testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient,
)
Expand All @@ -134,6 +144,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady

informerFactory.Start(context.TODO().Done())
informerFactory.WaitForCacheSync(nil)

return &disruptionController{
dc,
informerFactory.Core().V1().Pods().Informer().GetStore(),
Expand All @@ -142,6 +155,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(),
informerFactory.Apps().V1().Deployments().Informer().GetStore(),
informerFactory.Apps().V1().StatefulSets().Informer().GetStore(),
coreClient,
fakeScaleClient,
}, ps
}
Expand Down Expand Up @@ -1025,3 +1039,144 @@ func TestDeploymentFinderFunction(t *testing.T) {
})
}
}

// This test checks that the disruption controller does not write stale data to
// a PDB status during race conditions with the eviction handler. Specifically,
// failed updates due to ResourceVersion conflict should not cause a stale value
// of PodDisruptionsAllowed to be written.
//
// In this test, PodDisruptionsAllowed starts at 2.
// (A) We will delete 1 pod and trigger DisruptionController to set
// PodDisruptionsAllowed to 1.
// (B) As the DisruptionController attempts this write, we will evict the
// remaining 2 pods and update PodDisruptionsAllowed to 0. (The real eviction
// handler would allow this because it still sees PodDisruptionsAllowed=2.)
// (C) If the DisruptionController writes PodDisruptionsAllowed=1 despite the
// resource conflict error, then there is a bug.
func TestUpdatePDBStatusRetries(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies that it retries on a conflict error, but nowhere is it made clear in the code or comments why this is important. Can you mention the race here and that this is just the simplest way of avoiding it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've drastically revamped the test so that it asserts on the underlying bad behavior rather than how we fix it. PTAL

dc, _ := newFakeDisruptionController()
// Inject the production code over our fake impl
dc.getUpdater = func() updater { return dc.writePdbStatus }

// Create a PDB and 3 pods that match it.
pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1))
pdb, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
podNames := []string{"moe", "larry", "curly"}
for _, name := range podNames {
pod, _ := newPod(t, name)
_, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
}

// Block until the fake clientset writes are observable in the informer caches.
// FUN FACT: This guarantees that the informer caches have updated, but it does
// not guarantee that informer event handlers have completed. Fortunately,
// DisruptionController does most of its logic by reading from informer
// listers, so this guarantee is sufficient.
if err := waitForCacheCount(dc.pdbStore, 1); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this wait covered by the wait on line 1084 below ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test without this wait and the test passed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is likely to be flaky without these waits.

t.Fatalf("Failed to verify PDB in informer cache: %v", err)
}
if err := waitForCacheCount(dc.podStore, len(podNames)); err != nil {
t.Fatalf("Failed to verify pods in informer cache: %v", err)
}

// Sync DisruptionController once to update PDB status.
if err := dc.sync(pdbKey); err != nil {
t.Fatalf("Failed initial sync: %v", err)
}

// Evict simulates the visible effects of eviction in our fake client.
evict := func(podNames ...string) {
// These GVRs are copied from the generated fake code because they are not exported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract the code till line 1125 in a helper for future code reuse

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the evict function, or just the GVR constants? I would prefer to extract these in the future when we have a second or third call site.

The function could be extracted as a generic helper, but the current implementation is tightly coupled to this unit test. It both captures local variables from the surrounding scope and makes assumptions about the fake clientset that will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring in the future is fine.

var (
podsResource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
poddisruptionbudgetsResource = schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}
)

// Bypass the coreClient.Fake and write directly to the ObjectTracker, because
// this helper will be called while the Fake is holding a lock.
obj, err := dc.coreClient.Tracker().Get(poddisruptionbudgetsResource, pdb.Namespace, pdb.Name)
if err != nil {
t.Fatalf("Failed to get PDB: %v", err)
}
updatedPDB := obj.(*policy.PodDisruptionBudget)
// Each eviction,
// - decrements PodDisruptionsAllowed
// - adds the pod to DisruptedPods
// - deletes the pod
updatedPDB.Status.PodDisruptionsAllowed -= int32(len(podNames))
updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time)
for _, name := range podNames {
updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now())
}
if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil {
t.Fatalf("Eviction (PDB update) failed: %v", err)
}
for _, name := range podNames {
if err := dc.coreClient.Tracker().Delete(podsResource, "default", name); err != nil {
t.Fatalf("Eviction (pod delete) failed: %v", err)
}
}
}

// The fake kube client does not update ResourceVersion or check for conflicts.
// Instead, we add a reactor that returns a conflict error on the first PDB
// update and success after that.
var failOnce sync.Once
dc.coreClient.Fake.PrependReactor("update", "poddisruptionbudgets", func(a core.Action) (handled bool, obj runtime.Object, err error) {
failOnce.Do(func() {
// (B) Evict two pods and fail this update.
evict(podNames[1], podNames[2])
handled = true
err = errors.NewConflict(a.GetResource().GroupResource(), pdb.Name, fmt.Errorf("conflict"))
})
return handled, obj, err
})

// (A) Delete one pod
if err := dc.coreClient.CoreV1().Pods("default").Delete(podNames[0], &metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil {
t.Fatalf("Failed to verify pods in informer cache: %v", err)
}

// The sync() function should either write a correct status which takes the
// evictions into account, or re-queue the PDB for another sync (by returning
// an error)
if err := dc.sync(pdbKey); err != nil {
t.Logf("sync() returned with error: %v", err)
} else {
t.Logf("sync() returned with no error")
}

// (C) Whether or not sync() returned an error, the PDB status should reflect
// the evictions that took place.
finalPDB, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets("default").Get(pdb.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get PDB: %v", err)
}
if expected, actual := int32(0), finalPDB.Status.PodDisruptionsAllowed; expected != actual {
t.Errorf("PodDisruptionsAllowed should be %d, got %d", expected, actual)
}
}

// waitForCacheCount blocks until the given cache store has the desired number
// of items in it. This will return an error if the condition is not met after a
// 10 second timeout.
func waitForCacheCount(store cache.Store, n int) error {
return wait.Poll(10*time.Millisecond, 10*time.Second, func() (bool, error) {
return len(store.List()) == n, nil
})
}

// TestMain adds klog flags to make debugging tests easier.
func TestMain(m *testing.M) {
klog.InitFlags(flag.CommandLine)
os.Exit(m.Run())
}