Skip to content

Commit

Permalink
Investigate and fix the handling of Succeeded pods in DaemonSet
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Apr 4, 2023
1 parent d89d5ab commit b5dd5f1
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
FailedPlacementReason = "FailedPlacement"
// FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
FailedDaemonPodReason = "FailedDaemonPod"
// SucceededDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Succeeded'.
SucceededDaemonPodReason = "SucceededDaemonPod"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
Expand Down Expand Up @@ -842,6 +844,12 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else if pod.Status.Phase == v1.PodSucceeded {
msg := fmt.Sprintf("Found succeeded daemon pod %s/%s on node %s, will try to delete it", pod.Namespace, pod.Name, node.Name)
logger.V(2).Info("Found succeeded daemon pod on node, will try to delete it", "pod", klog.KObj(pod), "node", klog.KObj(node))
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, SucceededDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
Expand Down
79 changes: 79 additions & 0 deletions test/integration/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,41 @@ func validateDaemonSetPodsAndMarkReady(
}
}

func validateDaemonSetPodsActive(
podClient corev1client.PodInterface,
podInformer cache.SharedIndexInformer,
numberPods int,
t *testing.T,
) {
if err := wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
if len(objects) < numberPods {
return false, nil
}
podsActiveCount := 0
for _, object := range objects {
pod := object.(*v1.Pod)
ownerReferences := pod.ObjectMeta.OwnerReferences
if len(ownerReferences) != 1 {
return false, fmt.Errorf("Pod %s has %d OwnerReferences, expected only 1", pod.Name, len(ownerReferences))
}
controllerRef := ownerReferences[0]
if got, want := controllerRef.Kind, "DaemonSet"; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("controllerRef.Controller is not set to true")
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodPending {
podsActiveCount += 1
}
}
return podsActiveCount == numberPods, nil
}); err != nil {
t.Fatal(err)
}
}

// podUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status.
func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
Expand Down Expand Up @@ -498,6 +533,50 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
})
}

func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
for _, podPhase := range []v1.PodPhase{v1.PodSucceeded, v1.PodFailed} {
t.Run(string(podPhase), func(tt *testing.T) {
forEachStrategy(tt, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "daemonset-restart-terminal-pod-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)

dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podClient := clientset.CoreV1().Pods(ns.Name)
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

ds := newDaemonSet("restart-terminal-pod", ns.Name)
ds.Spec.UpdateStrategy = *strategy
if _, err := dsClient.Create(ctx, ds, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, clientset, ds)

numNodes := 3
addNodes(nodeClient, 0, numNodes, nil, t)

validateDaemonSetPodsAndMarkReady(podClient, podInformer, numNodes, t)
validateDaemonSetStatus(dsClient, ds.Name, int32(numNodes), t)
podToMarkAsTerminal := podInformer.GetIndexer().List()[0].(*v1.Pod)
podCopy := podToMarkAsTerminal.DeepCopy()
podCopy.Status.Phase = podPhase
if _, err := podClient.UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", podPhase, err)
}
// verify all pods are active. They either continue Running or are Pending after restart
validateDaemonSetPodsActive(podClient, podInformer, numNodes, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, numNodes, t)
validateDaemonSetStatus(dsClient, ds.Name, int32(numNodes), t)
})
})
}
}

func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
ctx, closeFn, dc, informers, clientset := setup(t)
Expand Down

0 comments on commit b5dd5f1

Please sign in to comment.