Skip to content

Commit

Permalink
UPSTREAM: 116833: Fix memory leak in kubelet volume_manager populator…
Browse files Browse the repository at this point in the history
… processedPods

`findAndRemoveDeletedPods()` processes only pods from volume_manager cache: `dswp.desiredStateOfWorld.GetVolumesToMount()`. `podWorker` calls volume_manager `WaitForUnmount()` asynchronously. If it happens after populator cleaned up resources, an entry is added to `processedPods` and will never be seen. Let's cleanup such entries if they don't have a pod and marked for deletion.
  • Loading branch information
mpatlasov committed Apr 20, 2023
1 parent bda0cb5 commit 18baf8a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
podsFromCache := make(map[volumetypes.UniquePodName]struct{})
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
podsFromCache[volumetypes.UniquePodName(volumeToMount.Pod.UID)] = struct{}{}
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
if podExists {

Expand Down Expand Up @@ -273,6 +275,23 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
dswp.deleteProcessedPod(volumeToMount.PodName)
}

// Cleanup orphanded entries from processedPods
dswp.pods.Lock()
orphanedPods := make([]volumetypes.UniquePodName, 0, len(dswp.pods.processedPods))
for k := range dswp.pods.processedPods {
if _, ok := podsFromCache[k]; !ok {
orphanedPods = append(orphanedPods, k)
}
}
dswp.pods.Unlock()
for _, orphanedPod := range orphanedPods {
uid := types.UID(orphanedPod)
_, podExists := dswp.podManager.GetPodByUID(uid)
if !podExists && dswp.podStateProvider.ShouldPodRuntimeBeRemoved(uid) {
dswp.deleteProcessedPod(orphanedPod)
}
}

podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors()
for _, podName := range podsWithError {
if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
}

// podWorker may call volume_manager WaitForUnmount() after we processed the pod in findAndRemoveDeletedPods()
dswp.ReprocessPod(podName)
dswp.findAndRemoveDeletedPods()

// findAndRemoveDeletedPods() above must detect orphaned pod and delete it from the map
if _, ok := dswp.pods.processedPods[podName]; ok {
t.Fatalf("Failed to remove orphanded pods from internal map")
}

volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */)
if volumeExists {
t.Fatalf(
Expand Down

0 comments on commit 18baf8a

Please sign in to comment.