Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated pkg/scheduler/framework/preemption & defaultpreemption to use contextual logging #116835

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -142,6 +142,7 @@ func (pl *DefaultPreemption) SelectVictimsOnNode(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
logger := klog.FromContext(ctx)
var potentialVictims []*framework.PodInfo
removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
Expand Down Expand Up @@ -207,7 +208,7 @@ func (pl *DefaultPreemption) SelectVictimsOnNode(
}
rpi := pi.Pod
victims = append(victims, rpi)
klog.V(5).InfoS("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
}
return fits, nil
}
Expand Down
Expand Up @@ -36,6 +36,7 @@ import (
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
kubeschedulerconfigv1beta2 "k8s.io/kube-scheduler/config/v1beta2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -1337,7 +1338,8 @@ func TestSelectBestCandidate(t *testing.T) {
cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes)
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
Expand Down Expand Up @@ -1380,7 +1382,7 @@ func TestSelectBestCandidate(t *testing.T) {
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(candidates)
s := pe.SelectCandidate(logger, candidates)
if s == nil || len(s.Name()) == 0 {
return
}
Expand Down
48 changes: 26 additions & 22 deletions pkg/scheduler/framework/preemption/preemption.go
Expand Up @@ -150,20 +150,22 @@ type Evaluator struct {
// - <non-nil PostFilterResult, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)

// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj.
// However, tests may need to manually initialize the shared pod informer.
podNamespace, podName := pod.Namespace, pod.Name
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.ErrorS(err, "Getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
logger.Error(err, "Could not get the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
return nil, framework.AsStatus(err)
}

// 1) Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
klog.V(5).InfoS("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
return nil, framework.NewStatus(framework.Unschedulable, msg)
}

Expand All @@ -188,13 +190,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}

// 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status := ev.callExtenders(pod, candidates)
candidates, status := ev.callExtenders(logger, pod, candidates)
if !status.IsSuccess() {
return nil, status
}

// 4) Find the best candidate.
bestCandidate := ev.SelectCandidate(candidates)
bestCandidate := ev.SelectCandidate(logger, candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
}
Expand All @@ -217,12 +219,13 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
if len(allNodes) == 0 {
return nil, nil, errors.New("no nodes available")
}
logger := klog.FromContext(ctx)
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
if len(potentialNodes) == 0 {
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
logger.V(3).Info("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
// In this case, we should clean-up any existing nominated node name of the pod.
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))
logger.Error(err, "Could not clear the nominatedNodeName field of pod", "pod", klog.KObj(pod))
// We do not return as this error is not critical.
}
return nil, unschedulableNodeStatus, nil
Expand All @@ -234,12 +237,12 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
}

offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
if klogV := klog.V(5); klogV.Enabled() {
if loggerV := logger.V(5); logger.Enabled() {
var sample []string
for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {
sample = append(sample, potentialNodes[i].Node().Name)
}
klogV.InfoS("Selecting candidates from a pool of nodes", "potentialNodesCount", len(potentialNodes), "offset", offset, "sampleLength", len(sample), "sample", sample, "candidates", numCandidates)
loggerV.Info("Selected candidates from a pool of nodes", "potentialNodesCount", len(potentialNodes), "offset", offset, "sampleLength", len(sample), "sample", sample, "candidates", numCandidates)
}
candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
for node, nodeStatus := range unschedulableNodeStatus {
Expand All @@ -252,7 +255,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
// We will only check <candidates> with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) {
func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) {
extenders := ev.Handler.Extenders()
nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos()
if len(extenders) == 0 {
Expand All @@ -272,8 +275,8 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
if err != nil {
if extender.IsIgnorable() {
klog.InfoS("Skipping extender as it returned error and has ignorable flag set",
"extender", extender, "err", err)
logger.Info("Skipped extender as it returned error and has ignorable flag set",
"extender", extender.Name(), "err", err)
continue
}
return nil, framework.AsStatus(err)
Expand All @@ -283,7 +286,7 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi
if victims == nil || len(victims.Pods) == 0 {
if extender.IsIgnorable() {
delete(nodeNameToVictims, nodeName)
klog.InfoS("Ignoring node without victims", "node", klog.KRef("", nodeName))
logger.Info("Ignored node for which the extender didn't report victims", "node", klog.KRef("", nodeName), "extender", extender.Name())
continue
}
return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName))
Expand Down Expand Up @@ -312,7 +315,7 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi

// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) Candidate {
if len(candidates) == 0 {
return nil
}
Expand All @@ -321,7 +324,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
}

victimsMap := ev.CandidatesToVictimsMap(candidates)
candidateNode := pickOneNodeForPreemption(victimsMap)
candidateNode := pickOneNodeForPreemption(logger, victimsMap)

// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
// preemption plugins that exercise different candidates on the same nominated node.
Expand All @@ -333,7 +336,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
}

// We shouldn't reach here.
klog.ErrorS(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
logger.Error(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
// To not break the whole flow, return the first candidate.
return candidates[0]
}
Expand All @@ -348,6 +351,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.

ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
Expand All @@ -367,13 +371,13 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
)

if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
Expand All @@ -392,9 +396,9 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
nominatedPods := getLowerPriorityNominatedPods(logger, fh, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
logger.Error(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
}

Expand Down Expand Up @@ -437,7 +441,7 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
// 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims) string {
if len(nodesToVictims) == 0 {
return ""
}
Expand Down Expand Up @@ -477,7 +481,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
// Get earliest start time of all pods on the current node.
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
if earliestStartTimeOnNode == nil {
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
return int64(math.MinInt64)
}
// The bigger the earliestStartTimeOnNode, the higher the score.
Expand Down Expand Up @@ -530,7 +534,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
podInfos := pn.NominatedPodsForNode(nodeName)

if len(podInfos) == 0 {
Expand Down