Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubelet: fix duplicated status updates at pod cleanup #21732

Merged
merged 2 commits into from Feb 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kubelet/dockertools/fake_docker_client.go
Expand Up @@ -297,6 +297,8 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
return err
}
f.Stopped = append(f.Stopped, id)
// Container status should be Updated before container moved to ExitedContainerList
f.updateContainerStatus(id, statusExitedPrefix)
var newList []docker.APIContainers
for _, container := range f.ContainerList {
if container.ID == id {
Expand All @@ -323,7 +325,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
container.State.Running = false
}
f.ContainerMap[id] = container
f.updateContainerStatus(id, statusExitedPrefix)
f.normalSleep(200, 50, 50)
return nil
}
Expand Down
36 changes: 7 additions & 29 deletions pkg/kubelet/kubelet.go
Expand Up @@ -1932,31 +1932,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil
}

// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}

// pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
Expand Down Expand Up @@ -2148,10 +2123,6 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()

if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}

// Clear out any old bandwidth rules
if err = kl.cleanupBandwidthLimits(allPods); err != nil {
return err
Expand Down Expand Up @@ -2412,6 +2383,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler

func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a termianted state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
Expand Down
42 changes: 22 additions & 20 deletions pkg/kubelet/status/manager.go
Expand Up @@ -83,10 +83,9 @@ type Manager interface {
// triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)

// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
TerminatePods(pods []*api.Pod) bool
// TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update.
TerminatePod(pod *api.Pod)

// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
Expand Down Expand Up @@ -149,7 +148,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
if err != nil {
return
}
m.updateStatusInternal(pod, status)
m.updateStatusInternal(pod, status, false)
}

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
Expand Down Expand Up @@ -212,31 +211,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
status.Conditions = append(status.Conditions, readyCondition)
}

m.updateStatusInternal(pod, status)
m.updateStatusInternal(pod, status, false)
}

func (m *manager) TerminatePods(pods []*api.Pod) bool {
allSent := true
func (m *manager) TerminatePod(pod *api.Pod) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for _, pod := range pods {
for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
allSent = false
oldStatus := &pod.Status
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
oldStatus = &cachedStatus.status
}
status, err := copyStatus(oldStatus)
if err != nil {
return
}
for i := range status.ContainerStatuses {
status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
return allSent
m.updateStatusInternal(pod, pod.Status, true)
}

// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
var oldStatus api.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
normalizeStatus(&status)
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status.
}
Expand All @@ -289,6 +289,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false
}
}
Expand Down