Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-13854: UPSTREAM: 117371: kubelet: Don't reference the pod manager interface directly from components #1578

Merged
merged 2 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/kubelet/eviction/eviction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type managerImpl struct {
config Config
// the function to invoke to kill a pod
killPodFunc KillPodFunc
// the function to get the mirror pod by a given static pod
mirrorPodFunc MirrorPodFunc
// the interface that knows how to do image gc
imageGC ImageGC
// the interface that knows how to do container gc
Expand Down Expand Up @@ -112,7 +110,6 @@ func NewManager(
summaryProvider stats.SummaryProvider,
config Config,
killPodFunc KillPodFunc,
mirrorPodFunc MirrorPodFunc,
imageGC ImageGC,
containerGC ContainerGC,
recorder record.EventRecorder,
Expand All @@ -123,7 +120,6 @@ func NewManager(
manager := &managerImpl{
clock: clock,
killPodFunc: killPodFunc,
mirrorPodFunc: mirrorPodFunc,
imageGC: imageGC,
containerGC: containerGC,
config: config,
Expand Down
6 changes: 0 additions & 6 deletions pkg/kubelet/eviction/eviction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,11 +1451,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) {
activePodsFunc := func() []*v1.Pod {
return pods
}
mirrorPodFunc := func(staticPod *v1.Pod) (*v1.Pod, bool) {
mirrorPod := staticPod.DeepCopy()
mirrorPod.Annotations[kubelettypes.ConfigSourceAnnotationKey] = kubelettypes.ApiserverSource
return mirrorPod, true
}

fakeClock := testingclock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
Expand Down Expand Up @@ -1490,7 +1485,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) {
manager := &managerImpl{
clock: fakeClock,
killPodFunc: podKiller.killPodNow,
mirrorPodFunc: mirrorPodFunc,
imageGC: diskGC,
containerGC: diskGC,
config: config,
Expand Down
282 changes: 202 additions & 80 deletions pkg/kubelet/kubelet.go

Large diffs are not rendered by default.

84 changes: 45 additions & 39 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,37 +975,17 @@ func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {

// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod, possiblyRunningPods map[types.UID]sets.Empty) {
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podUIDs[pod.UID] = true
}
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
for uid := range possiblyRunningPods {
podUIDs[uid] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}

// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod.
// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod
// from the API server
func (kl *Kubelet) deleteOrphanedMirrorPods() {
mirrorPods := kl.podManager.GetOrphanedMirrorPodNames()
for _, podFullname := range mirrorPods {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
}
}
}
}

// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories. No config changes are sent to pod workers while this method
Expand Down Expand Up @@ -1036,15 +1016,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
}
}

allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
activePods := kl.filterOutInactivePods(allPods)
allRegularPods, allStaticPods := splitPodsByStatic(allPods)
activeRegularPods, activeStaticPods := splitPodsByStatic(activePods)
metrics.DesiredPodCount.WithLabelValues("").Set(float64(len(allRegularPods)))
metrics.DesiredPodCount.WithLabelValues("true").Set(float64(len(allStaticPods)))
metrics.ActivePodCount.WithLabelValues("").Set(float64(len(activeRegularPods)))
metrics.ActivePodCount.WithLabelValues("true").Set(float64(len(activeStaticPods)))
metrics.MirrorPodCount.Set(float64(len(mirrorPods)))
allPods, mirrorPods, orphanedMirrorPodFullnames := kl.podManager.GetPodsAndMirrorPods()

// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
Expand Down Expand Up @@ -1113,7 +1085,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {

// Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods, possiblyRunningPods)
kl.removeOrphanedPodStatuses(allPods, mirrorPods)

// Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
Expand All @@ -1140,7 +1112,27 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Remove any orphaned mirror pods (mirror pods are tracked by name via the
// pod worker)
klog.V(3).InfoS("Clean up orphaned mirror pods")
kl.deleteOrphanedMirrorPods()
for _, podFullname := range orphanedMirrorPodFullnames {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
}
}
}

// After pruning pod workers for terminated pods get the list of active pods for
// metrics and to determine restarts.
activePods := kl.filterOutInactivePods(allPods)
allRegularPods, allStaticPods := splitPodsByStatic(allPods)
activeRegularPods, activeStaticPods := splitPodsByStatic(activePods)
metrics.DesiredPodCount.WithLabelValues("").Set(float64(len(allRegularPods)))
metrics.DesiredPodCount.WithLabelValues("true").Set(float64(len(allStaticPods)))
metrics.ActivePodCount.WithLabelValues("").Set(float64(len(activeRegularPods)))
metrics.ActivePodCount.WithLabelValues("true").Set(float64(len(activeStaticPods)))
metrics.MirrorPodCount.Set(float64(len(mirrorPods)))

// At this point, the pod worker is aware of which pods are not desired (SyncKnownPods).
// We now look through the set of active pods for those that the pod worker is not aware of
Expand All @@ -1158,10 +1150,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {

klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID)
isStatic := kubetypes.IsStaticPod(desiredPod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod)
if pod == nil || wasMirror {
klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: desiredPod,
Pod: pod,
MirrorPod: mirrorPod,
})

Expand Down Expand Up @@ -1248,7 +1244,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {

// Cleanup any backoff entries.
kl.backOff.GC()

return nil
}

Expand Down Expand Up @@ -1354,15 +1349,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
return fmt.Errorf("pod %q cannot be found - no logs available", name)
}

podUID := pod.UID
if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
// TODO: this should be using the podWorker's pod store as authoritative, since
// the mirrorPod might still exist, the pod may have been force deleted but
// is still terminating (users should be able to view logs of force deleted static pods
// based on full name).
var podUID types.UID
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
}
podUID = mirrorPod.UID
} else {
podUID = pod.UID
}

podStatus, found := kl.statusManager.GetPodStatus(podUID)
if !found {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
// config source (apiserver). This is useful if kubelet
// has recently been restarted.
podStatus = pod.Status
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())

Expand Down Expand Up @@ -338,7 +339,7 @@ func newTestKubeletWithImageList(
}
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{},
killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())

kubelet.evictionManager = evictionManager
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
Expand Down Expand Up @@ -595,7 +596,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
},
}
for _, pod := range pods {
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
})
if !got {
t.Errorf("Should not skip completed pod %q", pod.Name)
}
Expand Down Expand Up @@ -649,7 +654,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
}

for _, pod := range pods {
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
})
if !got {
t.Errorf("Should not skip active pod %q", pod.Name)
}
Expand Down Expand Up @@ -2519,9 +2528,9 @@ func TestHandlePodResourcesResize(t *testing.T) {
testPod2.UID: true,
testPod3.UID: true,
}
defer kubelet.podManager.DeletePod(testPod3)
defer kubelet.podManager.DeletePod(testPod2)
defer kubelet.podManager.DeletePod(testPod1)
defer kubelet.podManager.RemovePod(testPod3)
defer kubelet.podManager.RemovePod(testPod2)
defer kubelet.podManager.RemovePod(testPod1)

tests := []struct {
name string
Expand Down
1 change: 0 additions & 1 deletion pkg/kubelet/pod/mirror_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
return nil
}
}
klog.V(2).InfoS("Created mirror pod", "static_pod", klog.KObj(pod), "static_pod_uid", pod.UID, "mirror_pod", klog.KObj(apiPod), "mirror_pod_uid", apiPod.UID)
return err
}

Expand Down