Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #117019: kubelet: Mark new terminal pods as non-finished in pod worker #117433

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/kubelet/pod_workers.go
Expand Up @@ -775,16 +775,23 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
// Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated.
// This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523
// However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set.
if statusCache, err := p.podCache.Get(uid); err == nil {
if isPodStatusCacheTerminal(statusCache) {
// At this point we know:
// (1) The pod is terminal based on the config source.
// (2) The pod is terminal based on the runtime cache.
// This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart.
// These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod.
// As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run.
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
finished: false,
fullname: kubecontainer.BuildPodFullName(name, ns),
}
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/kubelet/pod_workers_test.go
Expand Up @@ -754,23 +754,41 @@ func TestUpdatePod(t *testing.T) {
expectKnownTerminated: true,
},
{
name: "a pod that is terminal and has never started is finished immediately if the runtime has a cached terminal state",
name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state",
update: UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
runtimeStatus: &kubecontainer.PodStatus{ /* we know about this pod */ },
expect: &podSyncStatus{
expectBeforeWorker: &podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
terminatedAt: time.Unix(1, 0),
pendingUpdate: &UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
finished: false, // Should be marked as not finished initially (to ensure `SyncTerminatedPod` will run) and status will progress to terminated.
startedTerminating: true,
working: true,
},
expect: hasContext(&podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
terminatedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
startedTerminating: true,
finished: true,
activeUpdate: &UpdatePodOptions{
UpdateType: kubetypes.SyncPodSync,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},

// if we have never seen the pod before, a restart makes no sense
restartRequested: false,
},
}),
expectKnownTerminated: true,
},
{
Expand Down
117 changes: 117 additions & 0 deletions test/e2e_node/restart_test.go
Expand Up @@ -28,7 +28,11 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
testutils "k8s.io/kubernetes/test/utils"
Expand All @@ -37,6 +41,7 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/uuid"
)

type podCondition func(pod *v1.Pod) (bool, error)
Expand Down Expand Up @@ -265,5 +270,117 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() {
}
}
})
// Regression test for https://issues.k8s.io/116925
ginkgo.It("should delete pods which are marked as terminal and have a deletion timestamp set after restart", func(ctx context.Context) {
podName := "terminal-restart-pod" + string(uuid.NewUUID())
gracePeriod := int64(30)
podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &gracePeriod,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: podName,
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"sh", "-c"},
Args: []string{`
sleep 9999999 &
PID=$!

_term () {
kill $PID
echo "Caught SIGTERM!"
}

trap _term SIGTERM
wait $PID
trap - TERM

# Wait for the long running sleep to exit
wait $PID

exit 0
`,
},
},
},
},
})
ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podName, podSpec.Spec.RestartPolicy))
pod := e2epod.NewPodClient(f).Create(ctx, podSpec)

ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be running", f.Namespace.Name, pod.Name))
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be running: (%v/%v)", f.Namespace.Name, pod.Name)

w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(ctx, options)
},
}

podsList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list pods in namespace: %s", f.Namespace.Name)

ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name))
time.Sleep(time.Second)
err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name)

ctxUntil, cancel := context.WithTimeout(ctx, f.Timeouts.PodStart)
defer cancel()

ginkgo.By(fmt.Sprintf("Started watch for pod (%v/%v) to enter succeeded phase", pod.Namespace, pod.Name))
_, err = watchtools.Until(ctxUntil, podsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
if pod, ok := event.Object.(*v1.Pod); ok {
found := pod.ObjectMeta.Name == podName &&
pod.ObjectMeta.Namespace == f.Namespace.Name &&
pod.Status.Phase == v1.PodSucceeded
if !found {
ginkgo.By(fmt.Sprintf("Observed Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase))
return false, nil
}
ginkgo.By(fmt.Sprintf("Found Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase))
return found, nil
}
ginkgo.By(fmt.Sprintf("Observed event: %+v", event.Object))
return false, nil
})
ginkgo.By("Ended watch for pod entering succeeded phase")
framework.ExpectNoError(err, "failed to see event that pod (%s/%s) enter succeeded phase: %v", pod.Namespace, pod.Name, err)

// As soon as the pod enters succeeded phase (detected by the watch above); kill the kubelet.
// This is a bit racy, but the goal is to stop the kubelet before the kubelet is able to delete the pod from the API-sever in order to repro https://issues.k8s.io/116925
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse())

ginkgo.By("Starting the kubelet")
startKubelet()

// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue())

// Wait for the Kubelet to be ready.
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, f.Timeouts.Poll).Should(gomega.BeTrue())

ginkgo.By(fmt.Sprintf("After the kubelet is restarted, verify the pod (%s/%s) is deleted by kubelet", pod.Namespace, pod.Name))
gomega.Eventually(ctx, func(ctx context.Context) error {
return checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace)
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil())
})
})

})