Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: Update the PreFilterExtensions interface to use PodInfo #97599

Merged
merged 1 commit into from Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/scheduler/framework/interface.go
Expand Up @@ -246,10 +246,10 @@ type QueueSortPlugin interface {
type PreFilterExtensions interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status
RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}

// PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.
Expand Down Expand Up @@ -438,12 +438,12 @@ type Framework interface {
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status

// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status

// RunPreScorePlugins runs the set of configured PreScore plugins. If any
// of these plugins returns any status other than "Success", the given pod is rejected.
Expand Down Expand Up @@ -585,7 +585,7 @@ type PluginsRunner interface {
// RunFilterPlugins runs the set of configured filter plugins for pod on the given node.
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured PreFilter plugins.
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured PreFilter plugins.
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}
Expand Up @@ -603,15 +603,16 @@ func selectVictimsOnNode(
if err := nodeInfo.RemovePod(rp); err != nil {
return err
}
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, framework.NewPodInfo(rp), nodeInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can change removePod to accept a PodInfo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also change the following to use podInfo:

  1. potentialVictims should be of type PodInfo
  2. change filterPodsWithPDBViolation to accept and return lists of type PodInfo
  3. change addPod and reprievePod to accept PodInfo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine you were planning to do that in a separate PR, but if so, please update the title and description of the PR to indicate that the goal of this PR is to update the PreFilterExtensions interface to use PodInfo (not reduce struct initialization).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am working on it locally with one failing test to investigate. Let us keep this PR small and focused.

if !status.IsSuccess() {
return status.AsError()
}
return nil
}
addPod := func(ap *v1.Pod) error {
nodeInfo.AddPod(ap)
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
podInfoToAdd := framework.NewPodInfo(ap)
nodeInfo.AddPodInfo(podInfoToAdd)
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, podInfoToAdd, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
Expand Down
24 changes: 10 additions & 14 deletions pkg/scheduler/framework/plugins/interpodaffinity/filtering.go
Expand Up @@ -36,7 +36,7 @@ const (
// ErrReasonExistingAntiAffinityRulesNotMatch is used for ExistingPodsAntiAffinityRulesNotMatch predicate error.
ErrReasonExistingAntiAffinityRulesNotMatch = "node(s) didn't satisfy existing pods anti-affinity rules"
// ErrReasonAffinityNotMatch is used for MatchInterPodAffinity predicate error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, all those comment is outdated, those should be "InterPodAffinity" instead of legacy "MatchInterPodAffinity".

ErrReasonAffinityNotMatch = "node(s) didn't match pod affinity/anti-affinity"
ErrReasonAffinityNotMatch = "node(s) didn't match pod affinity/anti-affinity rules"
// ErrReasonAffinityRulesNotMatch is used for PodAffinityRulesNotMatch predicate error.
ErrReasonAffinityRulesNotMatch = "node(s) didn't match pod affinity rules"
// ErrReasonAntiAffinityRulesNotMatch is used for PodAntiAffinityRulesNotMatch predicate error.
Expand Down Expand Up @@ -71,22 +71,18 @@ func (s *preFilterState) Clone() framework.StateData {
return &copy
}

// updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given pod.
func (s *preFilterState) updateWithPod(updatedPod *v1.Pod, node *v1.Node, multiplier int64) error {
// updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given podInfo.
func (s *preFilterState) updateWithPod(updatedPodInfo *framework.PodInfo, node *v1.Node, multiplier int64) {
if s == nil {
return nil
return
}

// Update matching existing anti-affinity terms.
// TODO(#91058): AddPod/RemovePod should pass a *framework.PodInfo type instead of *v1.Pod.
updatedPodInfo := framework.NewPodInfo(updatedPod)
s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(s.podInfo.Pod, node, updatedPodInfo.RequiredAntiAffinityTerms, multiplier)

// Update matching incoming pod (anti)affinity terms.
s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, s.podInfo.RequiredAffinityTerms, multiplier)
s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier)

return nil
s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAffinityTerms, multiplier)
s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier)
}

// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int64)" so that
Expand Down Expand Up @@ -279,22 +275,22 @@ func (pl *InterPodAffinity) PreFilterExtensions() framework.PreFilterExtensions
}

// AddPod from pre-computed data in cycleState.
func (pl *InterPodAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *InterPodAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
state.updateWithPod(podToAdd, nodeInfo.Node(), 1)
state.updateWithPod(podInfoToAdd, nodeInfo.Node(), 1)
return nil
}

// RemovePod from pre-computed data in cycleState.
func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
state.updateWithPod(podToRemove, nodeInfo.Node(), -1)
state.updateWithPod(podInfoToRemove, nodeInfo.Node(), -1)
return nil
}

Expand Down
Expand Up @@ -2061,7 +2061,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {

// Add test.addedPod to state1 and verify it is equal to allPodsState.
nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName)
if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, test.addedPod, nodeInfo); err != nil {
if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil {
t.Errorf("error adding pod to meta: %v", err)
}

Expand All @@ -2083,7 +2083,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
}

// Remove the added pod pod and make sure it is equal to the original state.
if err := ipa.RemovePod(context.Background(), cycleState, test.pendingPod, test.addedPod, nodeInfo); err != nil {
if err := ipa.RemovePod(context.Background(), cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil {
t.Errorf("error removing pod from meta: %v", err)
}
if !reflect.DeepEqual(originalState, state) {
Expand Down
Expand Up @@ -158,24 +158,24 @@ func (pl *PodTopologySpread) PreFilterExtensions() framework.PreFilterExtensions
}

// AddPod from pre-computed data in cycleState.
func (pl *PodTopologySpread) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *PodTopologySpread) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

s.updateWithPod(podToAdd, podToSchedule, nodeInfo.Node(), 1)
s.updateWithPod(podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1)
return nil
}

// RemovePod from pre-computed data in cycleState.
func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

s.updateWithPod(podToRemove, podToSchedule, nodeInfo.Node(), -1)
s.updateWithPod(podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1)
return nil
}

Expand Down
Expand Up @@ -838,7 +838,7 @@ func TestPreFilterStateAddPod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if s := pl.AddPod(ctx, cs, tt.preemptor, tt.addedPod, nodeInfo); !s.IsSuccess() {
if s := pl.AddPod(ctx, cs, tt.preemptor, framework.NewPodInfo(tt.addedPod), nodeInfo); !s.IsSuccess() {
t.Fatal(s.AsError())
}
state, err := getPreFilterState(cs)
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func TestPreFilterStateRemovePod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if s := pl.RemovePod(ctx, cs, tt.preemptor, deletedPod, nodeInfo); !s.IsSuccess() {
if s := pl.RemovePod(ctx, cs, tt.preemptor, framework.NewPodInfo(deletedPod), nodeInfo); !s.IsSuccess() {
t.Fatal(s.AsError())
}

Expand Down
Expand Up @@ -148,40 +148,40 @@ func (pl *ServiceAffinity) PreFilterExtensions() framework.PreFilterExtensions {
}

// AddPod from pre-computed data in cycleState.
func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

// If addedPod is in the same namespace as the pod, update the list
// of matching pods if applicable.
if podToAdd.Namespace != podToSchedule.Namespace {
if podInfoToAdd.Pod.Namespace != podToSchedule.Namespace {
return nil
}

selector := createSelectorFromLabels(podToSchedule.Labels)
if selector.Matches(labels.Set(podToAdd.Labels)) {
s.matchingPodList = append(s.matchingPodList, podToAdd)
if selector.Matches(labels.Set(podInfoToAdd.Pod.Labels)) {
s.matchingPodList = append(s.matchingPodList, podInfoToAdd.Pod)
}

return nil
}

// RemovePod from pre-computed data in cycleState.
func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

if len(s.matchingPodList) == 0 ||
podToRemove.Namespace != s.matchingPodList[0].Namespace {
podInfoToRemove.Pod.Namespace != s.matchingPodList[0].Namespace {
return nil
}

for i, pod := range s.matchingPodList {
if pod.Name == podToRemove.Name && pod.Namespace == podToRemove.Namespace {
if pod.Name == podInfoToRemove.Pod.Name && pod.Namespace == podInfoToRemove.Pod.Namespace {
s.matchingPodList = append(s.matchingPodList[:i], s.matchingPodList[i+1:]...)
break
}
Expand Down
Expand Up @@ -539,16 +539,17 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {

// Add test.addedPod to state1 and verify it is equal to allPodsState.
nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName)
if err := ipa.AddPod(context.Background(), state, test.pendingPod, test.addedPod, nodeInfo); err != nil {
addedPodInfo := framework.NewPodInfo(test.addedPod)
if err := ipa.AddPod(context.Background(), state, test.pendingPod, addedPodInfo, nodeInfo); err != nil {
t.Errorf("error adding pod to preFilterState: %v", err)
}

if !reflect.DeepEqual(sortState(plStateAllPods), sortState(plState)) {
t.Errorf("State is not equal, got: %v, want: %v", plState, plStateAllPods)
}

// Remove the added pod pod and make sure it is equal to the original state.
if err := ipa.RemovePod(context.Background(), state, test.pendingPod, test.addedPod, nodeInfo); err != nil {
// Remove the added pod and make sure it is equal to the original state.
if err := ipa.RemovePod(context.Background(), state, test.pendingPod, addedPodInfo, nodeInfo); err != nil {
t.Errorf("error removing pod from preFilterState: %v", err)
}
if !reflect.DeepEqual(sortState(plStateOriginal), sortState(plState)) {
Expand Down
25 changes: 13 additions & 12 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -458,14 +458,14 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod(
ctx context.Context,
state *framework.CycleState,
podToSchedule *v1.Pod,
podToAdd *v1.Pod,
podInfoToAdd *framework.PodInfo,
nodeInfo *framework.NodeInfo,
) (status *framework.Status) {
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
}
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo)
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo)
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running AddPod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
Expand All @@ -476,12 +476,12 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod(
return nil
}

func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
}
startTime := time.Now()
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
Expand All @@ -493,14 +493,14 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
ctx context.Context,
state *framework.CycleState,
podToSchedule *v1.Pod,
podToRemove *v1.Pod,
podInfoToRemove *framework.PodInfo,
nodeInfo *framework.NodeInfo,
) (status *framework.Status) {
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
}
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo)
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo)
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running RemovePod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
Expand All @@ -511,12 +511,12 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
return nil
}

func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
}
startTime := time.Now()
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
Expand Down Expand Up @@ -668,8 +668,9 @@ func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.P
podsAdded := false
for _, p := range nominatedPods {
if corev1.PodPriority(p) >= corev1.PodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
podInfoToAdd := framework.NewPodInfo(p)
nodeInfoOut.AddPodInfo(podInfoToAdd)
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, podInfoToAdd, nodeInfoOut)
if !status.IsSuccess() {
return false, state, nodeInfo, status.AsError()
}
Expand Down