Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove algorithm.NodeLister from scheduler interface #81151

Merged
merged 1 commit into from Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 0 additions & 7 deletions pkg/scheduler/algorithm/priorities/interpod_affinity.go
Expand Up @@ -24,7 +24,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
Expand All @@ -37,21 +36,15 @@ import (
// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
info predicates.NodeInfo
nodeLister algorithm.NodeLister
podLister algorithm.PodLister
hardPodAffinityWeight int32
}

// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(
info predicates.NodeInfo,
nodeLister algorithm.NodeLister,
podLister algorithm.PodLister,
hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
info: info,
nodeLister: nodeLister,
podLister: podLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority
Expand Down
5 changes: 0 additions & 5 deletions pkg/scheduler/algorithm/priorities/interpod_affinity_test.go
Expand Up @@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)

type FakeNodeListInfo []*v1.Node
Expand Down Expand Up @@ -513,8 +512,6 @@ func TestInterPodAffinityPriority(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
info: FakeNodeListInfo(test.nodes),
nodeLister: schedulertesting.FakeNodeLister(test.nodes),
podLister: schedulertesting.FakePodLister(test.pods),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)
Expand Down Expand Up @@ -603,8 +600,6 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
ipa := InterPodAffinity{
info: FakeNodeListInfo(test.nodes),
nodeLister: schedulertesting.FakeNodeLister(test.nodes),
podLister: schedulertesting.FakePodLister(test.pods),
hardPodAffinityWeight: test.hardPodAffinityWeight,
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)
Expand Down
Expand Up @@ -70,7 +70,7 @@ func init() {
priorities.InterPodAffinityPriority,
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) priorities.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/core/extender_test.go
Expand Up @@ -553,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
result, err := scheduler.Schedule(podIgnored, framework.NewPluginContext())
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)
Expand Down
16 changes: 8 additions & 8 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -132,12 +132,12 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, algorithm.NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Preempt(*v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]predicates.FitPredicate
Expand Down Expand Up @@ -186,7 +186,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)

Expand All @@ -211,7 +211,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister

trace.Step("Basic checks done")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (g *genericScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
Expand All @@ -328,7 +328,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes := nodeLister.ListNodes()
allNodes := g.cache.ListNodes()
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
Expand Down Expand Up @@ -461,13 +461,13 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i

// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
filteredNodesStatuses := framework.NodeToStatusMap{}

if len(g.predicates) == 0 {
filtered = nodeLister.ListNodes()
filtered = g.cache.ListNodes()
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/core/generic_scheduler_test.go
Expand Up @@ -651,7 +651,7 @@ func TestGenericScheduler(t *testing.T) {
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
result, err := scheduler.Schedule(test.pod, framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
}
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)

_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{})

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -723,7 +723,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(predicates, nodes)

pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod)

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -1935,7 +1935,7 @@ func TestPreempt(t *testing.T) {
if test.failedPredMap != nil {
failedPredMap = test.failedPredMap
}
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err := scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
Expand Down Expand Up @@ -1965,7 +1965,7 @@ func TestPreempt(t *testing.T) {
test.pod.Status.NominatedNodeName = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err = scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/scheduler/factory/factory.go
Expand Up @@ -80,13 +80,10 @@ type PodConditionUpdater interface {
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache

NodeLister algorithm.NodeLister
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
Expand Down Expand Up @@ -140,8 +137,6 @@ type PodPreemptor interface {
// construct a new scheduler.
type Configurator struct {
client clientset.Interface
// a means to list all known scheduled pods.
scheduledPodLister corelisters.PodLister
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
Expand Down Expand Up @@ -429,9 +424,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
)

return &Config{
SchedulerCache: c.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: c.schedulerCache,
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler.go
Expand Up @@ -284,7 +284,7 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
result, err := sched.config.Algorithm.Schedule(pod, pluginContext)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
Expand All @@ -303,7 +303,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
return "", err
}

node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
Expand Down
7 changes: 2 additions & 5 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -153,7 +153,7 @@ type mockScheduler struct {
err error
}

func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(pod *v1.Pod, pc *framework.PluginContext) (core.ScheduleResult, error) {
return es.result, es.err
}

Expand All @@ -164,7 +164,7 @@ func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
return nil
}

func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (es mockScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
}

Expand Down Expand Up @@ -285,7 +285,6 @@ func TestScheduler(t *testing.T) {

s := NewFromConfig(&factory.Config{
SchedulerCache: sCache,
NodeLister: sCache,
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
Expand Down Expand Up @@ -666,7 +665,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C

config := &factory.Config{
SchedulerCache: scache,
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
Expand Down Expand Up @@ -719,7 +717,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc

sched := NewFromConfig(&factory.Config{
SchedulerCache: scache,
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
Expand Down