Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generates DELETE pod update operations #27349

Merged
merged 1 commit into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/executor/service/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {

//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
kl.HandlePodRemoves(kl.GetPods())
}
64 changes: 43 additions & 21 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const (
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
// any change occurs.
PodConfigNotificationSnapshot
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
// changed, and a SET message if there are any additions or removals.
PodConfigNotificationSnapshotAndUpdates
// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
PodConfigNotificationIncremental
)

Expand Down Expand Up @@ -152,24 +152,27 @@ func (s *podStorage) Merge(source string, change interface{}) error {
defer s.updateLock.Unlock()

seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, reconciles := s.merge(source, change)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)

// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(deletes.Pods) > 0 {
s.updates <- *deletes
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 {
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE pods from the source. This signals kubelet that
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
Expand All @@ -179,15 +182,18 @@ func (s *podStorage) Merge(source string, change interface{}) error {
}

case PodConfigNotificationSnapshotAndUpdates:
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}

case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}

Expand All @@ -200,13 +206,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()

addPods := []*api.Pod{}
updatePods := []*api.Pod{}
deletePods := []*api.Pod{}
removePods := []*api.Pod{}
reconcilePods := []*api.Pod{}

pods := s.pods[source]
Expand All @@ -228,11 +235,13 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found {
pods[name] = existing
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
Expand All @@ -244,9 +253,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE:
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
} else if update.Op == kubetypes.DELETE {
glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
} else {
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
}
Expand All @@ -259,7 +270,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
deletePods = append(deletePods, existing)
removePods = append(removePods, existing)
continue
}
// this is a no-op
Expand All @@ -275,7 +286,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
for name, existing := range oldPods {
if _, found := pods[name]; !found {
// this is a delete
deletePods = append(deletePods, existing)
removePods = append(removePods, existing)
}
}

Expand All @@ -288,10 +299,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

return adds, updates, deletes, reconciles
return adds, updates, deletes, removes, reconciles
}

func (s *podStorage) markSourceSet(source string) {
Expand Down Expand Up @@ -413,10 +425,13 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {

// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// * else return both false
// Now, needUpdate and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {

// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
Expand All @@ -431,7 +446,6 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
}
return
}
// this is an update

// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
Expand All @@ -443,7 +457,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Status = ref.Status
updateAnnotations(existing, ref)
needUpdate = true

// 2. this is an graceful delete
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}

return
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"sort"
"strconv"
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/conversion"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -248,6 +250,25 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
}

func TestNewPodAddedDelete(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

// should register an add
addedPod := CreateValidPod("foo", "new")
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))

// mark this pod as deleted
timestamp := unversioned.NewTime(time.Now())
deletedPod := CreateValidPod("foo", "new")
deletedPod.ObjectMeta.DeletionTimestamp = &timestamp
podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
channel <- podUpdate
// the existing pod should be gracefully deleted
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
}

func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

Expand Down
17 changes: 11 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ const (
type SyncHandler interface {
HandlePodAdditions(pods []*api.Pod)
HandlePodUpdates(pods []*api.Pod)
HandlePodDeletions(pods []*api.Pod)
HandlePodRemoves(pods []*api.Pod)
HandlePodReconcile(pods []*api.Pod)
HandlePodSyncs(pods []*api.Pod)
HandlePodCleanups() error
Expand Down Expand Up @@ -1774,7 +1774,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
// pod - the pod to sync
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
// podStatus - the current status (TODO: always from the status manager?)
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE)
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE)
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
Expand Down Expand Up @@ -2642,13 +2642,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodDeletions(u.Pods)
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")

}
case e := <-plegCh:
// PLEG event for a pod; sync it.
Expand Down Expand Up @@ -2784,9 +2789,9 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
}
}

// HandlePodDeletions is the callback in the SyncHandler interface for pods
// being deleted from a config source.
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/types/pod_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
SET PodOperation = iota
// Pods with the given ids are new to this source
ADD
// Pods with the given ids are gracefully deleted from this source
DELETE
// Pods with the given ids have been removed from this source
REMOVE
// Pods with the given ids have been updated in this source
Expand Down