Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefilter extension point implementation for the scheduler #78005

Merged
merged 1 commit into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/scheduler/core/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)
Expand Down
10 changes: 8 additions & 2 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error)
Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
Expand Down Expand Up @@ -181,14 +181,20 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond)

if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}

// Run "prefilter" plugins.
prefilterStatus := g.framework.RunPrefilterPlugins(pluginContext, pod)
if !prefilterStatus.IsSuccess() {
return result, prefilterStatus.AsError()
}

nodes, err := nodeLister.List()
if err != nil {
return result, err
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,7 @@ func TestGenericScheduler(t *testing.T) {
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))

result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr)
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type framework struct {
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
postbindPlugins []PostbindPlugin
Expand Down Expand Up @@ -85,6 +86,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f.plugins[name] = p
}

if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := f.plugins[pf.Name]; ok {
p, ok := pg.(PrefilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name)
}
f.prefilterPlugins = append(f.prefilterPlugins, p)
} else {
return nil, fmt.Errorf("prefilter plugin %v does not exist", pf.Name)
}
}
}

if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok {
Expand Down Expand Up @@ -185,6 +200,28 @@ func (f *framework) QueueSortFunc() LessFunc {
return f.queueSortPlugins[0].Less
}

// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPrefilterPlugins(
pc *PluginContext, pod *v1.Pod) *Status {
for _, pl := range f.prefilterPlugins {
status := pl.Prefilter(pc, pod)
if !status.IsSuccess() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please handle "status == nil" case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if status.Code() == Unschedulable {
msg := fmt.Sprintf("rejected by %v at prefilter: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
msg := fmt.Sprintf("error while running %v prefilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
return nil
}

// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}

// PrefilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PrefilterPlugin interface {
Plugin
// Prefilter is called at the beginning of the scheduling cycle. All prefilter
// plugins must return success or the pod will be rejected.
Prefilter(pc *PluginContext, p *v1.Pod) *Status
}

// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler.
Expand Down Expand Up @@ -190,6 +199,12 @@ type Framework interface {
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc

// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status

// RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot {
return nil
}

func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
return nil
}

func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s

// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
Expand Down Expand Up @@ -458,7 +458,7 @@ func (sched *Scheduler) scheduleOne() {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
pluginContext := framework.NewPluginContext()
scheduleResult, err := sched.schedule(pod)
scheduleResult, err := sched.schedule(pod, pluginContext)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
)

// EmptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition.
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)

// EmptyPluginConfig is an empty plugin config used in tests.
Expand Down Expand Up @@ -159,7 +159,7 @@ type mockScheduler struct {
err error
}

func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) {
return es.result, es.err
}

Expand Down
111 changes: 111 additions & 0 deletions test/integration/scheduler/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
// TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities.
type TesterPlugin struct {
numPrefilterCalled int
numReserveCalled int
numPrebindCalled int
numPostbindCalled int
numUnreserveCalled int
failPrefilter bool
rejectPrefilter bool
failReserve bool
failPrebind bool
rejectPrebind bool
Expand All @@ -46,6 +49,10 @@ type TesterPlugin struct {
waitAndAllowPermit bool
}

type PrefilterPlugin struct {
TesterPlugin
}

type ReservePlugin struct {
TesterPlugin
}
Expand All @@ -68,13 +75,15 @@ type PermitPlugin struct {
}

const (
prefilterPluginName = "prefilter-plugin"
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
postbindPluginName = "postbind-plugin"
permitPluginName = "permit-plugin"
)

var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.PostbindPlugin(&PostbindPlugin{})
Expand Down Expand Up @@ -154,6 +163,30 @@ func NewPostbindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewo
return ptbdPlugin, nil
}

var pfPlugin = &PrefilterPlugin{}

// Name returns name of the plugin.
func (pp *PrefilterPlugin) Name() string {
return prefilterPluginName
}

// Prefilter is a test function that returns (true, nil) or errors for testing.
func (pp *PrefilterPlugin) Prefilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
pp.numPrefilterCalled++
if pp.failPrefilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPrefilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
}

// NewPrebindPlugin is the factory for prebind plugin.
func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pfPlugin, nil
}

var unresPlugin = &UnreservePlugin{}

// Name returns name of the plugin.
Expand Down Expand Up @@ -226,6 +259,84 @@ func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framewor
return perPlugin, nil
}

// TestPrefilterPlugin tests invocation of prefilter plugins.
func TestPrefilterPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{prefilterPluginName: NewPrefilterPlugin}

// Setup initial prefilter plugin for testing.
prefilterPlugin := &schedulerconfig.Plugins{
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prefilterPluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prefilter-plugin", nil),
false, nil, registry, prefilterPlugin, emptyPluginConfig, false, time.Second)

defer cleanupTest(t, context)

cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}

tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
}

for i, test := range tests {
pfPlugin.failPrefilter = test.fail
pfPlugin.rejectPrefilter = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}

if test.reject || test.fail {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}

if pfPlugin.numPrefilterCalled == 0 {
t.Errorf("Expected the prefilter plugin to be called.")
}

cleanupPods(cs, t, []*v1.Pod{pod})
}
}

// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
Expand Down