Skip to content

Commit

Permalink
Merge pull request kubernetes#24144 from tnozicka/4.2-fix-rs-expectat…
Browse files Browse the repository at this point in the history
…ions

[release-4.2] Bug 1772087: Fix RS expectations

Origin-commit: 8fb3cf371d4b1985c3e2a27b634471f156da6c1a
  • Loading branch information
k8s-publishing-bot committed Dec 7, 2019
2 parents 501d199 + eadcafe commit 38ff45a
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 59 deletions.
25 changes: 25 additions & 0 deletions pkg/controller/replicaset/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package replicaset

import (
"k8s.io/klog"
)

func init() {
klog.InitFlags(nil)
}
121 changes: 83 additions & 38 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,9 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
}

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
AddFunc: rsc.addRS,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
DeleteFunc: rsc.deleteRS,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
Expand Down Expand Up @@ -228,11 +225,50 @@ func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controll
return rs
}

func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}

rsc.queue.Add(key)
}

func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}

rsc.queue.AddAfter(key, duration)
}

func (rsc *ReplicaSetController) addRS(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
rsc.enqueueRS(rs)
}

// callback when RS is updated
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet)

// TODO: make a KEP and fix informers to always call the delete event handler on re-create
if curRS.UID != oldRS.UID {
key, err := controller.KeyFunc(oldRS)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
return
}
rsc.deleteRS(cache.DeletedFinalStateUnknown{
Key: key,
Obj: oldRS,
})
}

// You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
Expand All @@ -248,7 +284,36 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
}
rsc.enqueueReplicaSet(cur)
rsc.enqueueRS(curRS)
}

func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}

key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}

klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)

// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(key)

rsc.queue.Add(key)
}

// When a pod is created, enqueue the replica set that manages it and update its expectations.
Expand All @@ -274,7 +339,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs)
rsc.queue.Add(rsKey)
return
}

Expand All @@ -288,7 +353,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
}
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss {
rsc.enqueueReplicaSet(rs)
rsc.enqueueRS(rs)
}
}

Expand Down Expand Up @@ -325,7 +390,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
rsc.enqueueReplicaSet(rs)
rsc.enqueueRS(rs)
}
}

Expand All @@ -336,7 +401,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
return
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rsc.enqueueReplicaSet(rs)
rsc.enqueueRS(rs)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the
Expand All @@ -348,7 +413,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
Expand All @@ -362,7 +427,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
}
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rs := range rss {
rsc.enqueueReplicaSet(rs)
rsc.enqueueRS(rs)
}
}
}
Expand Down Expand Up @@ -400,31 +465,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.enqueueReplicaSet(rs)
}

// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}

// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.AddAfter(key, after)
rsc.queue.Add(rsKey)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
Expand All @@ -447,7 +493,7 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool {
return true
}

utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)

return true
Expand All @@ -460,7 +506,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
Expand Down Expand Up @@ -560,7 +606,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {

startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
Expand All @@ -583,7 +628,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}

Expand Down Expand Up @@ -622,7 +667,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
Expand Down
Loading

0 comments on commit 38ff45a

Please sign in to comment.