-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schedule DaemonSet Pods in scheduler. #63223
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -762,7 +762,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, e | |
return cm.ClaimPods(pods) | ||
} | ||
|
||
// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. | ||
// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes. | ||
// This also reconciles ControllerRef by adopting/orphaning. | ||
// Note that returned Pods are pointers to objects in the cache. | ||
// If you want to modify one, you need to deep-copy it first. | ||
|
@@ -774,9 +774,16 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[s | |
// Group Pods by Node name. | ||
nodeToDaemonPods := make(map[string][]*v1.Pod) | ||
for _, pod := range claimedPods { | ||
nodeName := pod.Spec.NodeName | ||
nodeName, err := util.GetTargetNodeName(pod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it necessary to do this? Isn't nodeName filled once the pod gets scheduled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a daemonset pod may never get scheduled (if the target node lacks resources, or is tainted, or not ready, etc). even in cases where the pod eventually gets scheduled, between creation of the unscheduled pod and the binding to the node, the DS controller can easily create lots of extra pods targeted at the node if this doesn't accurately return the ones it already created There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, that makes sense. |
||
if err != nil { | ||
glog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v", | ||
pod.Namespace, pod.Name, ds.Namespace, ds.Name) | ||
continue | ||
} | ||
|
||
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, sort the pods by phase & creation timestamp. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just revert it; the node without There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I understand why this is reverted. If a DaemonSet pod is already scheduled on this node, shouldn't other pending pods be deleted first? |
||
return nodeToDaemonPods, nil | ||
} | ||
|
||
|
@@ -850,7 +857,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( | |
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. | ||
// Sort the daemon pods by creation time, so the oldest is preserved. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: scheduled pod is preserved first; if more than one pod can be preserved, the oldest pod is preserved. |
||
if len(daemonPodsRunning) > 1 { | ||
sort.Sort(podByCreationTimestamp(daemonPodsRunning)) | ||
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) | ||
for i := 1; i < len(daemonPodsRunning); i++ { | ||
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) | ||
} | ||
|
@@ -870,7 +877,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( | |
// which nodes should not run a Pod of ds but currently running one, it calls function | ||
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds. | ||
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error { | ||
// Find out which nodes are running the daemon pods controlled by ds. | ||
// Find out the pods which are created for the nodes by DaemonSet. | ||
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) | ||
if err != nil { | ||
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) | ||
|
@@ -962,9 +969,12 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod | |
|
||
podTemplate := &template | ||
|
||
if false /*disabled for 1.10*/ && utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { | ||
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should enable this feature by default in 1.11. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't enable an alpha feature by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This cannot remain disabled in 1.11. Rescheduler is already removed from the code-base. If critical daemonsets cannot be scheduled, preemption must create room for them and DS controller is incapable of performing preemption. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually when I thought about it again, I realized that my concern may not be valid. IIUC Rescheduler could not help with scheduling critical DS pods anyway, because DS controller did not create a DS pod before it found a node that could run the pod. So, Rescheduler was not even aware that such critical DS pods needed to be scheduled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bsalamat , here's the code about critical pod in daemonset: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/daemon/daemon_controller.go#L1429 . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, Klaus. So, my initial concern is valid. DS controller does not run "resource check" for critical pods. This means that it creates critical DS Pods regardless of the resources available on the nodes and it relies on "Rescheduler" to free up resources on the nodes if necessary. In the absence of Rescheduler, it is important to let default scheduler schedule DS Pods. Otherwise, critical DS pods may never be scheduled when their corresponding nodes are out of resources. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your concern is a good point :). I re-check the code that, critical pod ( Let me also check whether it is enabled specially in test-infra :). If not enabled, I think that's safe for us to remove it; and we need to update yaml files about critical pods if any. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I arranged with @ravisantoshgudimetla to make Rescheduler aware of Pod priority and add it back to help create room for critical DS Pods. So, this PR can remain as is (no need to enable the feature in 1.11). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have to add that it was @ravisantoshgudimetla's idea to add priority awareness and use Rescheduler in 1.11. It removes a blocker in moving priority and preemption to Beta. |
||
podTemplate = template.DeepCopy() | ||
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodHostnameNodeAffinity( | ||
// The pod's NodeAffinity will be updated to make sure the Pod is bound | ||
// to the target node by default scheduler. It is safe to do so because there | ||
// should be no conflicting node affinity with the target node. | ||
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( | ||
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) | ||
podTemplate.Spec.Tolerations = util.AppendNoScheduleTolerationIfNotExist(podTemplate.Spec.Tolerations) | ||
|
||
|
@@ -1098,7 +1108,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, hash | |
currentNumberScheduled++ | ||
// Sort the daemon pods by creation time, so that the oldest is first. | ||
daemonPods, _ := nodeToDaemonPods[node.Name] | ||
sort.Sort(podByCreationTimestamp(daemonPods)) | ||
sort.Sort(podByCreationTimestampAndPhase(daemonPods)) | ||
pod := daemonPods[0] | ||
if podutil.IsPodReady(pod) { | ||
numberReady++ | ||
|
@@ -1414,7 +1424,7 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit | |
var predicateFails []algorithm.PredicateFailureReason | ||
|
||
// If ScheduleDaemonSetPods is enabled, only check nodeSelector and nodeAffinity. | ||
if false /*disabled for 1.10*/ && utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { | ||
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { | ||
fit, reasons, err := nodeSelectionPredicates(pod, nil, nodeInfo) | ||
if err != nil { | ||
return false, predicateFails, err | ||
|
@@ -1466,12 +1476,21 @@ func (o byCreationTimestamp) Less(i, j int) bool { | |
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) | ||
} | ||
|
||
type podByCreationTimestamp []*v1.Pod | ||
type podByCreationTimestampAndPhase []*v1.Pod | ||
|
||
func (o podByCreationTimestampAndPhase) Len() int { return len(o) } | ||
func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] } | ||
|
||
func (o podByCreationTimestamp) Len() int { return len(o) } | ||
func (o podByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } | ||
func (o podByCreationTimestampAndPhase) Less(i, j int) bool { | ||
// Scheduled Pod first | ||
if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 { | ||
return true | ||
} | ||
|
||
if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 { | ||
return false | ||
} | ||
|
||
func (o podByCreationTimestamp) Less(i, j int) bool { | ||
if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { | ||
return o[i].Name < o[j].Name | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,6 @@ import ( | |
podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" | ||
"k8s.io/kubernetes/pkg/features" | ||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" | ||
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" | ||
"k8s.io/kubernetes/pkg/scheduler/algorithm" | ||
) | ||
|
@@ -136,19 +135,20 @@ func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod) ([]*v1.Pod, []* | |
return availablePods, unavailablePods | ||
} | ||
|
||
// ReplaceDaemonSetPodHostnameNodeAffinity replaces the 'kubernetes.io/hostname' NodeAffinity term with | ||
// the given "nodeName" in the "affinity" terms. | ||
func ReplaceDaemonSetPodHostnameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity { | ||
// ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution | ||
// NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName. | ||
// Note that this function assumes that no NodeAffinity conflicts with the selected nodeName. | ||
func ReplaceDaemonSetPodNodeNameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity { | ||
nodeSelReq := v1.NodeSelectorRequirement{ | ||
Key: algorithm.NodeFieldSelectorKeyNodeName, | ||
Operator: v1.NodeSelectorOpIn, | ||
Values: []string{nodename}, | ||
} | ||
|
||
nodeSelector := &v1.NodeSelector{ | ||
NodeSelectorTerms: []v1.NodeSelectorTerm{ | ||
{ | ||
MatchExpressions: []v1.NodeSelectorRequirement{ | ||
{ | ||
Key: kubeletapis.LabelHostname, | ||
Operator: v1.NodeSelectorOpIn, | ||
Values: []string{nodename}, | ||
}, | ||
}, | ||
MatchFields: []v1.NodeSelectorRequirement{nodeSelReq}, | ||
}, | ||
}, | ||
} | ||
|
@@ -175,28 +175,12 @@ func ReplaceDaemonSetPodHostnameNodeAffinity(affinity *v1.Affinity, nodename str | |
return affinity | ||
} | ||
|
||
nodeSelectorTerms := []v1.NodeSelectorTerm{} | ||
|
||
// Removes hostname node selector, as only the target hostname will take effect. | ||
for _, term := range nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { | ||
exps := []v1.NodeSelectorRequirement{} | ||
for _, exp := range term.MatchExpressions { | ||
if exp.Key != kubeletapis.LabelHostname { | ||
exps = append(exps, exp) | ||
} | ||
} | ||
|
||
if len(exps) > 0 { | ||
term.MatchExpressions = exps | ||
nodeSelectorTerms = append(nodeSelectorTerms, term) | ||
} | ||
} | ||
|
||
// Adds target hostname NodeAffinity term. | ||
nodeSelectorTerms = append(nodeSelectorTerms, nodeSelector.NodeSelectorTerms[0]) | ||
|
||
// Replace node selector with the new one. | ||
nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms | ||
nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{ | ||
{ | ||
MatchFields: []v1.NodeSelectorRequirement{nodeSelReq}, | ||
}, | ||
} | ||
|
||
return affinity | ||
} | ||
|
@@ -225,3 +209,42 @@ func AppendNoScheduleTolerationIfNotExist(tolerations []v1.Toleration) []v1.Tole | |
|
||
return tolerations | ||
} | ||
|
||
// GetTargetNodeName get the target node name of DaemonSet pods. If `.spec.NodeName` is not empty (nil), | ||
// return `.spec.NodeName`; otherwise, retrieve node name of pending pods from NodeAffinity. Return error | ||
// if failed to retrieve node name from `.spec.NodeName` and NodeAffinity. | ||
func GetTargetNodeName(pod *v1.Pod) (string, error) { | ||
if len(pod.Spec.NodeName) != 0 { | ||
return pod.Spec.NodeName, nil | ||
} | ||
|
||
// If ScheduleDaemonSetPods was enabled before, retrieve node name of unscheduled pods from NodeAffinity | ||
if pod.Spec.Affinity == nil || | ||
pod.Spec.Affinity.NodeAffinity == nil || | ||
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { | ||
return "", fmt.Errorf("no spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution for pod %s/%s", | ||
pod.Namespace, pod.Name) | ||
} | ||
|
||
terms := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms | ||
if len(terms) < 1 { | ||
return "", fmt.Errorf("no nodeSelectorTerms in requiredDuringSchedulingIgnoredDuringExecution of pod %s/%s", | ||
pod.Namespace, pod.Name) | ||
} | ||
|
||
for _, term := range terms { | ||
for _, exp := range term.MatchFields { | ||
if exp.Key == algorithm.NodeFieldSelectorKeyNodeName && | ||
exp.Operator == v1.NodeSelectorOpIn { | ||
if len(exp.Values) != 1 { | ||
return "", fmt.Errorf("the matchFields value of '%s' is not unique for pod %s/%s", | ||
algorithm.NodeFieldSelectorKeyNodeName, pod.Namespace, pod.Name) | ||
} | ||
|
||
return exp.Values[0], nil | ||
} | ||
} | ||
} | ||
|
||
return "", fmt.Errorf("no node name found for pod %s/%s", pod.Namespace, pod.Name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like here we are changing the behavior if the feature is disabled by returning this error which was not checked before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to get |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it seems to be changing the behavior if the feature is disabled by returning error if pod.Spec.NodeName is 0.