Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up static/mirror pod status logic #16545

Merged
merged 1 commit into from
Nov 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 3 additions & 13 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func NewMainKubelet(
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := status.NewManager(kubeClient)
containerRefManager := kubecontainer.NewRefManager()

volumeManager := newVolumeManager()
Expand Down Expand Up @@ -288,7 +287,6 @@ func NewMainKubelet(
recorder: recorder,
cadvisor: cadvisorInterface,
diskSpaceManager: diskSpaceManager,
statusManager: statusManager,
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
Expand Down Expand Up @@ -418,6 +416,7 @@ func NewMainKubelet(

klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
klet.statusManager = status.NewManager(kubeClient, klet.podManager)

klet.probeManager = prober.NewManager(
klet.resyncInterval,
Expand Down Expand Up @@ -1438,27 +1437,18 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont

// Before returning, regenerate status and store it in the cache.
defer func() {
if kubepod.IsStaticPod(pod) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not
// exist yet.
return
}
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
// Propagate the error upstream.
syncErr = err
} else {
podToUpdate := pod
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID)
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(podToUpdate, status)
kl.statusManager.SetPodStatus(pod, status)
}
}()

Expand Down
36 changes: 2 additions & 34 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
Expand All @@ -120,6 +119,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.cadvisor = mockCadvisor
fakeMirrorClient := kubepod.NewFakeMirrorClient()
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager)
kubelet.containerRefManager = kubecontainer.NewRefManager()
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
if err != nil {
Expand Down Expand Up @@ -160,6 +160,7 @@ func newTestPods(count int) []*api.Pod {
},
},
ObjectMeta: api.ObjectMeta{
UID: types.UID(10000 + i),
Name: fmt.Sprintf("pod%d", i),
},
}
Expand Down Expand Up @@ -3249,39 +3250,6 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
mockCadvisor.AssertExpectations(t)
}

func TestDoNotCacheStatusForStaticPods(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kubelet := testKubelet.kubelet

pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "staticFoo",
Namespace: "new",
Annotations: map[string]string{
kubetypes.ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
},
},
}

kubelet.podManager.SetPods(pods)
kubelet.HandlePodSyncs(kubelet.podManager.GetPods())
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
}
}

func TestHostNetworkAllowed(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/prober/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
Expand Down Expand Up @@ -268,7 +269,7 @@ func newTestManager() *manager {
const probePeriod = 1
m := NewManager(
probePeriod,
status.NewManager(&testclient.Fake{}),
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
results.NewManager(),
results.NewManager(),
nil, // runner
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/prober/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestDoProbe(t *testing.T) {
}

// Clean up.
m.statusManager = status.NewManager(&testclient.Fake{})
m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil))
resultsManager(m, probeType).Remove(containerID)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil),
statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager,
os: kubecontainer.FakeOS{},
Expand Down
63 changes: 29 additions & 34 deletions pkg/kubelet/status/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types"
Expand All @@ -53,6 +54,7 @@ type podStatusSyncRequest struct {
// All methods are thread-safe.
type manager struct {
kubeClient client.Interface
podManager kubepod.Manager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
Expand Down Expand Up @@ -87,9 +89,10 @@ type Manager interface {

const syncPeriod = 10 * time.Second

func NewManager(kubeClient client.Interface) Manager {
func NewManager(kubeClient client.Interface, podManager kubepod.Manager) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[types.UID]uint64),
Expand Down Expand Up @@ -131,49 +134,41 @@ func (m *manager) Start() {
func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
status, ok := m.podStatuses[uid]
status, ok := m.podStatuses[m.podManager.TranslatePodUID(uid)]
return status.status, ok
}

func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
oldStatus, found := m.podStatuses[pod.UID]

// ensure that the start time does not change across updates.
if found && oldStatus.status.StartTime != nil {
status.StartTime = oldStatus.status.StartTime
var oldStatus api.PodStatus
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}

// Set ReadyCondition.LastTransitionTime.
// Note we cannot do this while generating the status since we do not have oldStatus
// at that time for mirror pods.
if readyCondition := api.GetPodReadyCondition(status); readyCondition != nil {
// Need to set LastTransitionTime.
lastTransitionTime := unversioned.Now()
if found {
oldReadyCondition := api.GetPodReadyCondition(oldStatus.status)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
readyCondition.LastTransitionTime = lastTransitionTime
}

// if the status has no start time, we need to set an initial time
// TODO(yujuhong): Consider setting StartTime when generating the pod
// status instead, which would allow manager to become a simple cache
// again.
if status.StartTime.IsZero() {
if pod.Status.StartTime.IsZero() {
// the pod did not have a previously recorded value so set to now
now := unversioned.Now()
status.StartTime = &now
} else {
// the pod had a recorded value, but the kubelet restarted so we need to rebuild cache
// based on last observed value
status.StartTime = pod.Status.StartTime
}
// ensure that the start time does not change across updates.
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
} else if status.StartTime.IsZero() {
// if the status has no start time, we need to set an initial time
now := unversioned.Now()
status.StartTime = &now
}

newStatus := m.updateStatusInternal(pod, status)
Expand Down Expand Up @@ -288,14 +283,14 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// TODO: make me easier to express from client code
pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", status.podName)
glog.V(3).Infof("Pod %q (%s) was deleted on the server", status.podName, uid)
m.deletePodStatus(uid)
return
}
if err == nil {
if len(pod.UID) > 0 && pod.UID != uid {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update",
kubeletutil.FormatPodName(pod))
translatedUID := m.podManager.TranslatePodUID(pod.UID)
if len(translatedUID) > 0 && translatedUID != uid {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletutil.FormatPodName(pod))
m.deletePodStatus(uid)
return
}
Expand All @@ -310,12 +305,12 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", pod.Name)
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", kubeletutil.FormatPodName(pod))
return
}
if err := m.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", pod.Name)
m.deletePodStatus(pod.UID)
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", kubeletutil.FormatPodName(pod))
m.deletePodStatus(uid)
return
}
}
Expand Down