Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track deletes in rc manager with a UID expectations cache. #22579

Merged
merged 1 commit into from
Mar 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 104 additions & 1 deletion pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/integer"
"k8s.io/kubernetes/pkg/util/sets"
)

const (
Expand Down Expand Up @@ -243,11 +244,105 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}

// NewControllerExpectations returns a store for ControlleeExpectations.
// NewControllerExpectations returns a store for ControllerExpectations.
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
}

// UIDSetKeyFunc to parse out the key from a UIDSet.
var UIDSetKeyFunc = func(obj interface{}) (string, error) {
if u, ok := obj.(*UIDSet); ok {
return u.key, nil
}
return "", fmt.Errorf("Could not find key for obj %#v", obj)
}

// UIDSet holds a key and a set of UIDs. Used by the
// UIDTrackingControllerExpectations to remember which UID it has seen/still
// waiting for.
type UIDSet struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this UID* is confusing, since we don't actually populate it using metadata.uid. How about just IDSet?

sets.String
key string
}

// UIDTrackingControllerExpectations tracks the UID of the pods it deletes.
// This cache is needed over plain old expectations to safely handle graceful
// deletion. The desired behavior is to treat an update that sets the
// DeletionTimestamp on an object as a delete. To do so consistenly, one needs
// to remember the expected deletes so they aren't double counted.
// TODO: Track creates as well (#22599)
type UIDTrackingControllerExpectations struct {
ControllerExpectationsInterface
// TODO: There is a much nicer way to do this that involves a single store,
// a lock per entry, and a ControlleeExpectationsInterface type.
uidStoreLock sync.Mutex
// Store used for the UIDs associated with any expectation tracked via the
// ControllerExpectationsInterface.
uidStore cache.Store
}

// GetUIDs is a convenience method to avoid exposing the set of expected uids.
// The returned set is not thread safe, all modifications must be made holding
// the uidStoreLock.
func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
return uid.(*UIDSet).String
}
return nil
}

// ExpectDeletions records expectations for the given deleteKeys, against the given controller.
func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()

if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
glog.Errorf("Clobbering existing delete keys: %+v", existing)
}
expectedUIDs := sets.NewString()
for _, k := range deletedKeys {
expectedUIDs.Insert(k)
}
glog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
return err
}
return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
}

// DeletionObserved records the given deleteKey as a deletion, for the given rc.
func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()

uids := u.GetUIDs(rcKey)
if uids != nil && uids.Has(deleteKey) {
glog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
u.ControllerExpectationsInterface.DeletionObserved(rcKey)
uids.Delete(deleteKey)
}
}

// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
// underlying ControllerExpectationsInterface.
func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()

u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
if err := u.uidStore.Delete(uidExp); err != nil {
glog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
}
}
}

// NewUIDTrackingControllerExpectations returns a wrapper around
// ControllerExpectations that is aware of deleteKeys.
func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
}

// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
Expand Down Expand Up @@ -517,6 +612,14 @@ func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions
return active
}

// PodKey returns a key unique to the given pod within a cluster.
// It's used so we consistently use the same key scheme in this module.
// It does exactly what cache.MetaNamespaceKeyFunc would have done
// expcept there's not possibility for error since we know the exact type.
func PodKey(pod *api.Pod) string {
return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
}

// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
type ControllersByCreationTimestamp []*api.ReplicationController

Expand Down
51 changes: 51 additions & 0 deletions pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,57 @@ func TestControllerExpectations(t *testing.T) {
}
}

func TestUIDExpectations(t *testing.T) {
uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations())
rcList := []*api.ReplicationController{
newReplicationController(2),
newReplicationController(1),
newReplicationController(0),
newReplicationController(5),
}
rcToPods := map[string][]string{}
rcKeys := []string{}
for i := range rcList {
rc := rcList[i]
rcName := fmt.Sprintf("rc-%v", i)
rc.Name = rcName
rc.Spec.Selector[rcName] = rcName
podList := newPodList(nil, 5, api.PodRunning, rc)
rcKey, err := KeyFunc(rc)
if err != nil {
t.Fatalf("Couldn't get key for object %+v: %v", rc, err)
}
rcKeys = append(rcKeys, rcKey)
rcPodNames := []string{}
for i := range podList.Items {
p := &podList.Items[i]
p.Name = fmt.Sprintf("%v-%v", p.Name, rc.Name)
rcPodNames = append(rcPodNames, PodKey(p))
}
rcToPods[rcKey] = rcPodNames
uidExp.ExpectDeletions(rcKey, rcPodNames)
}
for i := range rcKeys {
j := rand.Intn(i + 1)
rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i]
}
for _, rcKey := range rcKeys {
if uidExp.SatisfiedExpectations(rcKey) {
t.Errorf("Controller %v satisfied expectations before deletion", rcKey)
}
for _, p := range rcToPods[rcKey] {
uidExp.DeletionObserved(rcKey, p)
}
if !uidExp.SatisfiedExpectations(rcKey) {
t.Errorf("Controller %v didn't satisfy expectations after deletion", rcKey)
}
uidExp.DeleteExpectations(rcKey)
if uidExp.GetUIDs(rcKey) != nil {
t.Errorf("Failed to delete uid expectations for %v", rcKey)
}
}
}

func TestCreatePods(t *testing.T) {
ns := api.NamespaceDefault
body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}})
Expand Down
64 changes: 32 additions & 32 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
// happens based on contents in local pod storage.
FullControllerResyncPeriod = 30 * time.Second

// Realistic value of the burstReplica field for the replication manager based off
// Realistic value of the burstReplica field for the replica set manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500

Expand All @@ -73,8 +73,8 @@ type ReplicaSetController struct {
// To allow injection of syncReplicaSet for testing.
syncHandler func(rsKey string) error

// A TTLCache of pod creates/deletes each ReplicaSet expects to see
expectations controller.ControllerExpectationsInterface
// A TTLCache of pod creates/deletes each rc expects to see.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You changed the comment back from ReplicaSet to rc. :-)

expectations *controller.UIDTrackingControllerExpectations

// A store of ReplicaSets, populated by the rsController
rsStore cache.StoreToReplicaSetLister
Expand Down Expand Up @@ -107,7 +107,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewControllerExpectations(),
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.New(),
}

Expand Down Expand Up @@ -297,17 +297,16 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err)
glog.Errorf("Couldn't get key for replica set %#v: %v", rs, err)
return
}
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rs %v", pod.Name, pod.DeletionTimestamp, rsKey)
rsc.expectations.DeletionObserved(rsKey)
} else {
rsc.expectations.CreationObserved(rsKey)
rsc.deletePod(pod)
return
}
rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs)
}

Expand All @@ -326,22 +325,15 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err)
return
}

if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil {
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rs never initiates a phase change, and so is never asleep waiting for the same.
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey)
rsc.expectations.DeletionObserved(rsKey)
} else {
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey)
rsc.deletePod(curPod)
return
}

rsc.enqueueReplicaSet(rs)
Expand Down Expand Up @@ -375,21 +367,14 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
return
}
}
glog.V(4).Infof("Pod %s deleted: %+v.", pod.Name, pod)
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
if rs := rsc.getPodReplicaSet(pod); rs != nil {
rsKey, err := controller.KeyFunc(rs)
if err != nil {
glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)
return
}
// This method only manages expectations for the case where a pod is
// deleted without a grace period.
if pod.DeletionTimestamp == nil {
glog.V(4).Infof("Received new delete for rs %v, pod %v", rsKey, pod.Name)
rsc.expectations.DeletionObserved(rsKey)
} else {
glog.V(4).Infof("Received delete for rs %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rsKey, pod.Name, pod.DeletionTimestamp)
}
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Pod %s deleted log message above, it would be useful to also print the namespace and deletion timestamp.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to know what type of event it was, also: add, update, delete.

rsc.enqueueReplicaSet(rs)
}
}
Expand Down Expand Up @@ -442,6 +427,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
Expand All @@ -462,7 +452,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectDeletions(rsKey, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff)
// No need to sort pods if we are about to delete all of them
if rs.Spec.Replicas != 0 {
Expand All @@ -471,16 +460,27 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}

// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey)
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
utilruntime.HandleError(err)
}
}(i)
Expand Down