Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A consistent interface for plugin extended functionality #83365

Merged
merged 1 commit into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionRemovePod(pluginContext, pod, rp, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
Expand All @@ -1132,7 +1132,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionAddPod(pluginContext, pod, ap, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func (t *TestPlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName stri
return 1, nil
}

func (t *TestPlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (t *TestPlugin) Extensions() framework.ScoreExtensions {
return nil
}

Expand Down
46 changes: 24 additions & 22 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,40 +297,40 @@ func (f *framework) RunPreFilterPlugins(
return nil
}

// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().AddPod(pc, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}

return nil
}

// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().RemovePod(pc, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}

Expand Down Expand Up @@ -417,8 +417,10 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
if pl.Extensions() == nil {
return
}
if status := pl.Extensions().NormalizeScore(pc, pod, nodeScoreList); !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
return
Expand Down
46 changes: 25 additions & 21 deletions pkg/scheduler/framework/v1alpha1/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
)

const (
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin"
)

// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
Expand Down Expand Up @@ -87,6 +87,10 @@ func (pl *TestScoreWithNormalizePlugin) Score(pc *PluginContext, p *v1.Pod, node
return setScoreRes(pl.inj)
}

func (pl *TestScoreWithNormalizePlugin) Extensions() ScoreExtensions {
return pl
}

// TestScorePlugin only implements ScorePlugin interface.
type TestScorePlugin struct {
name string
Expand All @@ -101,7 +105,7 @@ func (pl *TestScorePlugin) Score(pc *PluginContext, p *v1.Pod, nodeName string)
return setScoreRes(pl.inj)
}

func (pl *TestScorePlugin) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status {
func (pl *TestScorePlugin) Extensions() ScoreExtensions {
return nil
}

Expand All @@ -126,39 +130,39 @@ func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
return nil
}

func (pl *TestPreFilterPlugin) Updater() Updater {
func (pl *TestPreFilterPlugin) Extensions() PreFilterExtensions {
return nil
}

// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces.
type TestPreFilterWithUpdaterPlugin struct {
// TestPreFilterWithExtensionsPlugin implements Add/Remove interfaces.
type TestPreFilterWithExtensionsPlugin struct {
PreFilterCalled int
AddCalled int
RemoveCalled int
}

func (pl *TestPreFilterWithUpdaterPlugin) Name() string {
return preFilterWithUpdaterPluginName
func (pl *TestPreFilterWithExtensionsPlugin) Name() string {
return preFilterWithExtensionsPluginName
}

func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}

func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.AddCalled++
return nil
}

func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.RemoveCalled++
return nil
}

func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater {
func (pl *TestPreFilterWithExtensionsPlugin) Extensions() PreFilterExtensions {
return pl
}

Expand Down Expand Up @@ -424,25 +428,25 @@ func TestRunScorePlugins(t *testing.T) {

func TestPreFilterPlugins(t *testing.T) {
preFilter1 := &TestPreFilterPlugin{}
preFilter2 := &TestPreFilterWithUpdaterPlugin{}
preFilter2 := &TestPreFilterWithExtensionsPlugin{}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter1, nil
})
r.Register(preFilterWithUpdaterPluginName,
r.Register(preFilterWithExtensionsPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter2, nil
})
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}}
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
f, err := NewFramework(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(nil, nil)
f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil)
f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil)
f.RunPreFilterExtensionAddPod(nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(nil, nil, nil, nil)

if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)
Expand Down
43 changes: 23 additions & 20 deletions pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}

// Updater is an interface that is included in plugins that allow specifying
// PreFilterExtensions is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type Updater interface {
type PreFilterExtensions interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
Expand All @@ -184,13 +184,13 @@ type PreFilterPlugin interface {
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected.
PreFilter(pc *PluginContext, p *v1.Pod) *Status
// Updater returns an updater if the plugin implements one, or nil if it
// does not. A Pre-filter plugin can provide an updater to incrementally
// modify its pre-processed info. The framework guarantees that the updater
// AddPod/RemovePod functions will only be called after PreFilter,
// possibly on a cloned PluginContext, and may call those functions more than
// once before calling Filter again on a specific node.
Updater() Updater
// Extensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions
// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
// PluginContext, and may call those functions more than once before calling
// Filter again on a specific node.
Extensions() PreFilterExtensions
}

// FilterPlugin is an interface for Filter plugins. These plugins are called at the
Expand Down Expand Up @@ -230,6 +230,14 @@ type PostFilterPlugin interface {
PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
}

// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
}

// ScorePlugin is an interface that must be implemented by "score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
Expand All @@ -239,13 +247,8 @@ type ScorePlugin interface {
// the pod will be rejected.
Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)

// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
//
// NOTE: This function is optional, and you could implement it as a no-op by simply
// returning nil.
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
// Extensions returns a ScoreExtensions interface if it implements one, or nil if does not.
Extensions() ScoreExtensions
}

// ReservePlugin is an interface for Reserve plugins. These plugins are called
Expand Down Expand Up @@ -342,15 +345,15 @@ type Framework interface {
// schedule the target pod.
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status

// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status

// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status

// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
// of these plugins returns any status other than "Success", the given node is
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod,
return nil
}

func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
func (*fakeFramework) RunPreFilterExtensionAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}

func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
func (*fakeFramework) RunPreFilterExtensionRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}

Expand Down
10 changes: 7 additions & 3 deletions test/integration/scheduler/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName st
return score, nil
}

func (sp *ScorePlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (sp *ScorePlugin) Extensions() framework.ScoreExtensions {
return nil
}

Expand All @@ -189,6 +189,10 @@ func (sp *ScoreWithNormalizePlugin) NormalizeScore(pc *framework.PluginContext,
return nil
}

func (sp *ScoreWithNormalizePlugin) Extensions() framework.ScoreExtensions {
return sp
}

// Name returns name of the plugin.
func (fp *FilterPlugin) Name() string {
return filterPluginName
Expand Down Expand Up @@ -330,8 +334,8 @@ func (pp *PreFilterPlugin) Name() string {
return prefilterPluginName
}

// Updater returns the updater interface.
func (pp *PreFilterPlugin) Updater() framework.Updater {
// Extensions returns the PreFilterExtensions interface.
func (pp *PreFilterPlugin) Extensions() framework.PreFilterExtensions {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.
return nil
}

func (fp *tokenFilter) Updater() framework.Updater {
func (fp *tokenFilter) Extensions() framework.PreFilterExtensions {
return fp
}

Expand Down