Skip to content
Permalink
Browse files

Handle errors better in the disruption controller

  • Loading branch information
mortent committed Nov 21, 2019
1 parent 9767125 commit 238500bd62aec1df7bf95436994c35633341a863
@@ -52,6 +52,7 @@ go_test(
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@@ -184,6 +184,31 @@ var (
controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
)

type controllerNotFoundError struct {
controllerRef *metav1.OwnerReference
}

func (e *controllerNotFoundError) Error() string {
return fmt.Sprintf("resource of kind %s with name %s not found", e.controllerRef.Kind, e.controllerRef.Name)
}

func newControllerNotFoundError(controllerRef *metav1.OwnerReference) *controllerNotFoundError {
return &controllerNotFoundError{
controllerRef: controllerRef,
}
}

func isControllerNotFoundError(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *controllerNotFoundError:
return true
}
return false
}

// getPodReplicaSet finds a replicaset which has no matching deployments.
func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
@@ -192,11 +217,11 @@ func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerRefe
}
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
// The only possible error is NotFound.
return nil, newControllerNotFoundError(controllerRef)
}
if rs.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
controllerRef = metav1.GetControllerOf(rs)
if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
@@ -214,11 +239,11 @@ func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerRef
}
ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
// The only possible error is NotFound.
return nil, newControllerNotFoundError(controllerRef)
}
if ss.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}

return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
@@ -232,14 +257,15 @@ func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerRefe
}
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
// The only possible error is NotFound.
return nil, newControllerNotFoundError(controllerRef)
}
if rs.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
controllerRef = metav1.GetControllerOf(rs)
if controllerRef == nil {
// This case will be covered by the getPodReplicaSet finder.
return nil, nil
}

@@ -249,11 +275,11 @@ func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerRefe
}
deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
// The only possible error is NotFound.
return nil, newControllerNotFoundError(controllerRef)
}
if deployment.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
}
@@ -265,11 +291,11 @@ func (dc *DisruptionController) getPodReplicationController(controllerRef *metav
}
rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
// The only possible error is NotFound.
return nil, newControllerNotFoundError(controllerRef)
}
if rc.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
}
@@ -294,12 +320,12 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
return nil, err
}
if scale.UID != controllerRef.UID {
return nil, nil
return nil, newControllerNotFoundError(controllerRef)
}
return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
}
@@ -565,15 +591,15 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
}

expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods)
expectedCount, desiredHealthy, coveredPods, err := dc.getExpectedPodCount(pdb, pods)
if err != nil {
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
return err
}

currentTime := time.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
disruptedPods, recheckTime := dc.buildDisruptedPodMap(coveredPods, pdb, currentTime)
currentHealthy := countHealthyPods(coveredPods, disruptedPods, currentTime)
err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)

if err == nil && recheckTime != nil {
@@ -585,15 +611,20 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
return err
}

func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, err error) {
// getExpectedPodCount computes the expected number of pods (i.e. the total number of pods
// if all controllers covered by the pdb have the desired number of replicas), the desired number
// of healthy pods (the minimum number of of healthy pods as prescribed by the pdb), a list of
// the pods covered by the pdb (all pods that were part of the computation of the desired number of pods, so
// any pods without controllers or with unsupported controllers are left out).
func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, coveredPods []*v1.Pod, err error) {
err = nil
// TODO(davidopp): consider making the way expectedCount and rules about
// permitted controller configurations (specifically, considering it an error
// if a pod covered by a PDB has 0 controllers or > 1 controller) should be
// handled the same way for integer and percentage minAvailable

if pdb.Spec.MaxUnavailable != nil {
expectedCount, err = dc.getExpectedScale(pdb, pods)
expectedCount, coveredPods, err = dc.getExpectedScale(pdb, pods)
if err != nil {
return
}
@@ -610,8 +641,9 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
if pdb.Spec.MinAvailable.Type == intstr.Int {
desiredHealthy = pdb.Spec.MinAvailable.IntVal
expectedCount = int32(len(pods))
coveredPods = pods
} else if pdb.Spec.MinAvailable.Type == intstr.String {
expectedCount, err = dc.getExpectedScale(pdb, pods)
expectedCount, coveredPods, err = dc.getExpectedScale(pdb, pods)
if err != nil {
return
}
@@ -627,7 +659,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
return
}

func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, err error) {
func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, coveredPods []*v1.Pod, err error) {
// When the user specifies a fraction of pods that must be available, we
// use as the fraction's denominator
// SUM_{all c in C} scale(c)
@@ -647,37 +679,57 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget
for _, pod := range pods {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
err = fmt.Errorf("found no controller ref for pod %q", pod.Name)
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error())
return
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef",
fmt.Sprintf("found no controller ref for pod %q", pod.Name))
continue
}

// If we already know the scale of the controller there is no need to do anything.
if _, found := controllerScale[controllerRef.UID]; found {
coveredPods = append(coveredPods, pod)
continue
}

// Check all the supported controllers to find the desired scale.
foundController := false
controllerMatch := false
for _, finder := range dc.finders() {
var controllerNScale *controllerAndScale
controllerNScale, err = finder(controllerRef, pod.Namespace)
if err != nil {
controllerNScale, finderErr := finder(controllerRef, pod.Namespace)
// If the gk of the controller is known, but the resource is missing, we are
// dealing with an orphaned pod. Create an event, but keep going.
if isControllerNotFoundError(finderErr) {
dc.recorder.Event(pdb, v1.EventTypeWarning, "ControllerResourceMissing",
fmt.Sprintf("controller resource for pod %q missing: %s", pod.Name, finderErr.Error()))
controllerMatch = true
break
}
if finderErr != nil {
err = finderErr
return
}
if controllerNScale != nil {
controllerScale[controllerNScale.UID] = controllerNScale.scale
foundController = true
coveredPods = append(coveredPods, pod)
controllerMatch = true
break
}
}
if !foundController {
err = fmt.Errorf("found no controllers for pod %q", pod.Name)
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllers", err.Error())
return
// If the pod has a controllerRef but it is not supported by any of the finder functions,
// this means the controller does not support the scale subresource and can't be used with
// MaxUnavailable or MinAvailable percentage.
if !controllerMatch {
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoScaleController",
fmt.Sprintf("controller for pod %q does not support scale", pod.Name))
}
}

// If the map does not have any entries, it means none of the pods has a controller that
// could give us the desired scale.
// TODO: Consider if this should return an error in addition to creating the event.
if len(controllerScale) == 0 {
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoScaleControllers",
"No controllers found that support scale")
}

// 2. Add up all the controllers.
expectedCount = 0
for _, count := range controllerScale {
@@ -28,6 +28,7 @@ import (

apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
@@ -56,6 +57,8 @@ type pdbStates map[string]policy.PodDisruptionBudget

var alwaysReady = func() bool { return true }

var controllerKindJob = batchv1.SchemeGroupVersion.WithKind("Job")

func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error {
key, err := controller.KeyFunc(pdb)
if err != nil {
@@ -123,6 +126,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {

scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
_ = batchv1.AddToScheme(scheme)
fakeScaleClient := &scalefake.FakeScaleClient{}

dc := NewDisruptionController(
@@ -241,6 +245,13 @@ func updatePodOwnerToSs(t *testing.T, pod *v1.Pod, ss *apps.StatefulSet) {
pod.OwnerReferences = append(pod.OwnerReferences, controllerReference)
}

func updatePodOwnerToJob(t *testing.T, pod *v1.Pod, job *batchv1.Job) {
var controllerReference metav1.OwnerReference
var trueVar = true
controllerReference = metav1.OwnerReference{UID: job.UID, APIVersion: controllerKindJob.GroupVersion().String(), Kind: controllerKindJob.Kind, Name: job.Name, Controller: &trueVar}
pod.OwnerReferences = append(pod.OwnerReferences, controllerReference)
}

func newPod(t *testing.T, name string) (*v1.Pod, string) {
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
@@ -364,6 +375,26 @@ func newStatefulSet(t *testing.T, size int32) (*apps.StatefulSet, string) {
return ss, ssName
}

func newJob(t *testing.T) (*batchv1.Job, string) {
job := &batchv1.Job{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
Name: "foobar",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "18",
Labels: fooBar(),
},
Spec: batchv1.JobSpec{},
}

jobName, err := controller.KeyFunc(job)
if err != nil {
t.Fatalf("Unexpected error naming Job: %q: %v", job.Name, err)
}
return job, jobName
}

func update(t *testing.T, store cache.Store, obj interface{}) {
if err := store.Update(obj); err != nil {
t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
@@ -615,11 +646,12 @@ func TestMultipleControllers(t *testing.T) {
//ps.VerifyDisruptionAllowed(t, pdbName, 0)
}

func TestReplicationController(t *testing.T) {
func TestRoguePodWithMatchingLabels(t *testing.T) {
// The budget in this test matches foo=bar, but the RC and its pods match
// {foo=bar, baz=quux}. Later, when we add a rogue pod with only a foo=bar
// label, it will match the budget but have no controllers, which should
// trigger the controller to set PodDisruptionAllowed to false.
// label, it will match the budget but have no controllers. This should
// lead to this pod being ignored and the pdb will set status based on
// only the pods that have a known controller.
labels := map[string]string{
"foo": "bar",
"baz": "quux",
@@ -658,7 +690,47 @@ func TestReplicationController(t *testing.T) {
rogue, _ := newPod(t, "rogue")
add(t, dc.podStore, rogue)
dc.sync(pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0)
ps.VerifyDisruptionAllowed(t, pdbName, 1)
}

func TestPodsWithUnknownController(t *testing.T) {
labels := map[string]string{
"foo": "bar",
}

dc, ps := newFakeDisruptionController()

pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromString("25%"))
add(t, dc.pdbStore, pdb)
rc, _ := newReplicationController(t, 4)
rc.Spec.Selector = labels
add(t, dc.rcStore, rc)
dc.sync(pdbName)

ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})

var pods []*v1.Pod

for i := int32(0); i < 4; i++ {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
updatePodOwnerToRc(t, pod, rc)
pods = append(pods, pod)
pod.Labels = labels
add(t, dc.podStore, pod)
dc.sync(pdbName)
if i < 3 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 3, 4, map[string]metav1.Time{})
} else {
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{})
}
}

job, _ := newJob(t)
jobPod, _ := newPod(t, "foobarjob")
updatePodOwnerToJob(t, jobPod, job)
add(t, dc.podStore, jobPod)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{})
}

func TestStatefulSetController(t *testing.T) {

0 comments on commit 238500b

Please sign in to comment.
You can’t perform that action at this time.