Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync status to new mirror pods #17270

Merged
merged 1 commit into from
Nov 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 42 additions & 34 deletions pkg/kubelet/pod/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Manager interface {

DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID)
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
MirrorClient
}
Expand All @@ -79,6 +80,9 @@ type basicManager struct {
podByFullName map[string]*api.Pod
mirrorPodByFullName map[string]*api.Pod

// Mirror pod UID to pod UID map.
translationByUID map[types.UID]types.UID

// A mirror pod client to create/delete mirror pods.
MirrorClient
}
Expand All @@ -94,30 +98,14 @@ func NewBasicPodManager(client MirrorClient) Manager {
func (pm *basicManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.setPods(newPods)
}

func (pm *basicManager) setPods(newPods []*api.Pod) {
podByUID := make(map[types.UID]*api.Pod)
mirrorPodByUID := make(map[types.UID]*api.Pod)
podByFullName := make(map[string]*api.Pod)
mirrorPodByFullName := make(map[string]*api.Pod)

for _, pod := range newPods {
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
mirrorPodByUID[pod.UID] = pod
mirrorPodByFullName[podFullName] = pod
} else {
podByUID[pod.UID] = pod
podByFullName[podFullName] = pod
}
}
pm.podByUID = make(map[types.UID]*api.Pod)
pm.podByFullName = make(map[string]*api.Pod)
pm.mirrorPodByUID = make(map[types.UID]*api.Pod)
pm.mirrorPodByFullName = make(map[string]*api.Pod)
pm.translationByUID = make(map[types.UID]types.UID)

pm.podByUID = podByUID
pm.podByFullName = podByFullName
pm.mirrorPodByUID = mirrorPodByUID
pm.mirrorPodByFullName = mirrorPodByFullName
pm.updatePodsInternal(newPods...)
}

func (pm *basicManager) AddPod(pod *api.Pod) {
Expand All @@ -127,13 +115,22 @@ func (pm *basicManager) AddPod(pod *api.Pod) {
func (pm *basicManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
pm.updatePodsInternal(pod)
}

func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) {
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
if p, ok := pm.podByFullName[podFullName]; ok {
pm.translationByUID[pod.UID] = p.UID
}
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
}
}
}

Expand All @@ -144,6 +141,7 @@ func (pm *basicManager) DeletePod(pod *api.Pod) {
if IsMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName)
delete(pm.translationByUID, pod.UID)
} else {
delete(pm.podByUID, pod.UID)
delete(pm.podByFullName, podFullName)
Expand Down Expand Up @@ -207,15 +205,25 @@ func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID {

pm.lock.RLock()
defer pm.lock.RUnlock()
if mirrorPod, ok := pm.mirrorPodByUID[uid]; ok {
podFullName := kubecontainer.GetPodFullName(mirrorPod)
if pod, ok := pm.podByFullName[podFullName]; ok {
return pod.UID
}
if translated, ok := pm.translationByUID[uid]; ok {
return translated
}
return uid
}

func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) {
pm.lock.RLock()
defer pm.lock.RUnlock()

podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID))
mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID))
for k, v := range pm.translationByUID {
podToMirror[k] = v
mirrorToPod[v] = k
}
return podToMirror, mirrorToPod
}

func (pm *basicManager) getOrphanedMirrorPodNames() []string {
pm.lock.RLock()
defer pm.lock.RUnlock()
Expand Down
22 changes: 14 additions & 8 deletions pkg/kubelet/status/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,26 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
// syncBatch syncs pods statuses with the apiserver.
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()

// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
if _, ok := m.podStatuses[uid]; !ok {
_, hasPod := m.podStatuses[uid]
_, hasMirror := podToMirror[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}

for uid, status := range m.podStatuses {
if m.needsUpdate(uid, status) {
syncedUID := uid
if translated, ok := mirrorToPod[uid]; ok {
syncedUID = translated
}
if m.needsUpdate(syncedUID, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
Expand All @@ -313,11 +320,6 @@ func (m *manager) syncBatch() {

// syncPod syncs the given status with the API server. The caller must not hold the lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status)
return
}

// TODO: make me easier to express from client code
pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
Expand All @@ -332,12 +334,16 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
m.deletePodStatus(uid)
return
}
if !m.needsUpdate(pod.UID, status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm..determining whether to send an update this late would mean that we need to always get the pod from the apiserver. I don't think we need to move this check.

If a mirror pod gets recreated, we will eventually see it from the watch, triggering a new call to SetPodStatus. At this time, if you check the apiStatusVersions[newMirrorPod.UID], you would find nothing and send the update.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the reason I moved the check to here, is we need the mirror pod UID. I could look it up in the pod manager, but that would require getting a mutex and either constructing the full name artificially. Moving this method to here means that pod.UID is the UID we need to check (mirror pod, when relevant).

Also, it should be extremely rare that this check ever fails since we only call syncPod if we've already established that an update is needed. The only way this check would fail is if multiple updates were sent in rapid succession. (e.g. status is updated right before syncBatch runs).

glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status)
return
}
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too.
pod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(pod)
if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletutil.FormatPodName(pod))
m.apiStatusVersions[uid] = status.version
m.apiStatusVersions[pod.UID] = status.version

if pod.DeletionTimestamp == nil {
return
Expand Down
28 changes: 16 additions & 12 deletions pkg/kubelet/status/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func getRandomPodStatus() api.PodStatus {
func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []testclient.Action) {
actions := kubeClient.(*testclient.Fake).Actions()
if len(actions) != len(expectedActions) {
t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions)
t.Fatalf("unexpected actions, got: %s expected: %s", actions, expectedActions)
return
}
for i := 0; i < len(actions); i++ {
Expand Down Expand Up @@ -484,21 +484,25 @@ func TestStaticPodStatus(t *testing.T) {
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
client.ClearActions()

otherPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "other-87654321",
Name: "other",
Namespace: "new",
},
}
m.podManager.AddPod(otherPod)
m.SetPodStatus(otherPod, getRandomPodStatus())
// No changes.
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{})

// Mirror pod identity changes.
m.podManager.DeletePod(&mirrorPod)
mirrorPod.UID = "new-mirror-pod"
mirrorPod.Status = api.PodStatus{}
m.podManager.AddPod(&mirrorPod)
// Expect update to new mirrorPod.
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
})
_, found := m.GetPodStatus(otherPod.UID)
assert.False(t, found, "otherPod status should have been deleted")
updateAction = client.Actions()[1].(testclient.UpdateActionImpl)
updatedPod = updateAction.Object.(*api.Pod)
assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID)
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
}

func TestSetContainerReadiness(t *testing.T) {
Expand Down