Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip terminal Pods with a deletion timestamp from the Daemonset sync #118716

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.Dae
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
claimedPods, err := dsc.getDaemonPods(ctx, ds)
if err != nil {
return nil, err
Expand All @@ -761,6 +761,12 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *a
nodeToDaemonPods := make(map[string][]*v1.Pod)
logger := klog.FromContext(ctx)
for _, pod := range claimedPods {
if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
// This Pod has a finalizer or is already scheduled for deletion from the
// store by the kubelet or the Pod GC. The DS controller doesn't have
// anything else to do with it.
continue
}
nodeName, err := util.GetTargetNodeName(pod)
if err != nil {
logger.Info("Failed to get target node name of Pod in DaemonSet",
Expand Down Expand Up @@ -953,7 +959,7 @@ func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.D
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
Expand Down Expand Up @@ -1154,7 +1160,7 @@ func storeDaemonSetStatus(
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
Expand Down
177 changes: 115 additions & 62 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,75 +2739,128 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) {
}

func TestGetNodesToDaemonPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
ds2.Spec.UpdateStrategy = *strategy
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx, ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
err = manager.dsStore.Add(ds)
if err != nil {
t.Fatal(err)
}
err = manager.dsStore.Add(ds2)
if err != nil {
t.Fatal(err)
}
addNodes(manager.nodeStore, 0, 2, nil)

// These pods should be returned.
wantedPods := []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
}
failedPod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds)
failedPod.Status = v1.PodStatus{Phase: v1.PodFailed}
wantedPods = append(wantedPods, failedPod)
for _, pod := range wantedPods {
manager.podStore.Add(pod)
}

// These pods should be ignored.
ignoredPods := []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
}
for _, pod := range ignoredPods {
err = manager.podStore.Add(pod)
ds := newDaemonSet("foo")
ds2 := newDaemonSet("foo2")
cases := map[string]struct {
includeDeletedTerminal bool
wantedPods []*v1.Pod
ignoredPods []*v1.Pod
}{
"exclude deleted terminal pods": {
wantedPods: []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
ignoredPods: []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-deleted-pod-0-", "node-0", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
},
"include deleted terminal pods": {
includeDeletedTerminal: true,
wantedPods: []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
ignoredPods: []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx, ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
err = manager.dsStore.Add(ds)
if err != nil {
t.Fatal(err)
}
}
err = manager.dsStore.Add(ds2)
if err != nil {
t.Fatal(err)
}
addNodes(manager.nodeStore, 0, 2, nil)

nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds)
if err != nil {
t.Fatalf("getNodesToDaemonPods() error: %v", err)
}
gotPods := map[string]bool{}
for node, pods := range nodesToDaemonPods {
for _, pod := range pods {
if pod.Spec.NodeName != node {
t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName)
for _, pod := range tc.wantedPods {
manager.podStore.Add(pod)
}

for _, pod := range tc.ignoredPods {
err = manager.podStore.Add(pod)
if err != nil {
t.Fatal(err)
}
gotPods[pod.Name] = true
}
}
for _, pod := range wantedPods {
if !gotPods[pod.Name] {
t.Errorf("expected pod %v but didn't get it", pod.Name)

nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds, tc.includeDeletedTerminal)
if err != nil {
t.Fatalf("getNodesToDaemonPods() error: %v", err)
}
delete(gotPods, pod.Name)
}
for podName := range gotPods {
t.Errorf("unexpected pod %v was returned", podName)
}
gotPods := map[string]bool{}
for node, pods := range nodesToDaemonPods {
for _, pod := range pods {
if pod.Spec.NodeName != node {
t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName)
}
gotPods[pod.Name] = true
}
}
for _, pod := range tc.wantedPods {
if !gotPods[pod.Name] {
t.Errorf("expected pod %v but didn't get it", pod.Name)
}
delete(gotPods, pod.Name)
}
for podName := range gotPods {
t.Errorf("unexpected pod %v was returned", podName)
}
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/daemon/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
logger := klog.FromContext(ctx)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
Expand Down Expand Up @@ -294,7 +294,8 @@ func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps.
}

func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
// Include deleted terminal pods when maintaining history.
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, true)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
Expand Down
42 changes: 34 additions & 8 deletions test/integration/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/ktesting"
)

Expand Down Expand Up @@ -155,6 +156,7 @@ func newDaemonSet(name, namespace string) *apps.DaemonSet {
}

func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) {
t.Helper()
ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
Expand All @@ -176,6 +178,10 @@ func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet)
return
}

if len(ds.Spec.Template.Finalizers) > 0 {
testutils.RemovePodFinalizersInNamespace(context.TODO(), cs, t, ds.Namespace)
}

// Wait for the daemon set controller to kill all the daemon pods.
if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -275,9 +281,7 @@ func validateDaemonSetPodsAndMarkReady(
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth it to pass and verify the number of expected terminalPods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The controller should treat those pods as if they no longer exist. No point in checking how many there are IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is no real need, since we are marking the pods as terminal. It would just make the intent of the assert function clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep the diff as small as possible for cherry-picking purposes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if err := wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
if len(objects) != numberPods {
return false, nil
}
nonTerminatedPods := 0

for _, object := range objects {
pod := object.(*v1.Pod)
Expand All @@ -294,6 +298,10 @@ func validateDaemonSetPodsAndMarkReady(
t.Errorf("controllerRef.Controller is not set to true")
}

if podutil.IsPodPhaseTerminal(pod.Status.Phase) {
continue
}
nonTerminatedPods++
if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
podCopy := pod.DeepCopy()
podCopy.Status = v1.PodStatus{
Expand All @@ -307,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady(
}
}

return true, nil
return nonTerminatedPods == numberPods, nil
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -536,8 +544,23 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
}

func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
for _, podPhase := range []v1.PodPhase{v1.PodSucceeded, v1.PodFailed} {
t.Run(string(podPhase), func(t *testing.T) {
cases := map[string]struct {
phase v1.PodPhase
finalizer bool
}{
"Succeeded": {
phase: v1.PodSucceeded,
},
"Failed": {
phase: v1.PodFailed,
},
"Succeeded with finalizer": {
phase: v1.PodSucceeded,
finalizer: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
Expand All @@ -553,6 +576,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
go dc.Run(ctx, 2)

ds := newDaemonSet("restart-terminal-pod", ns.Name)
if tc.finalizer {
ds.Spec.Template.Finalizers = append(ds.Spec.Template.Finalizers, "test.k8s.io/finalizer")
}
ds.Spec.UpdateStrategy = *strategy
if _, err := dsClient.Create(ctx, ds, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
Expand All @@ -566,9 +592,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, int32(numNodes), t)
podToMarkAsTerminal := podInformer.GetIndexer().List()[0].(*v1.Pod)
podCopy := podToMarkAsTerminal.DeepCopy()
podCopy.Status.Phase = podPhase
podCopy.Status.Phase = tc.phase
if _, err := podClient.UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", podPhase, err)
t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", tc.phase, err)
}
// verify all pods are active. They either continue Running or are Pending after restart
validateDaemonSetPodsActive(podClient, podInformer, numNodes, t)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/podgc/podgc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) {
if err != nil {
t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod))
}
defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod})
defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod)

pod.Status.Phase = test.phase
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(testCtx.Ctx, pod, metav1.UpdateOptions{}); err != nil {
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) {
t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod))
}
if test.withFinalizer {
defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod})
defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod)
}

// trigger termination of the pod, but with long grace period so that it is not removed immediately
Expand Down