Skip to content

Commit

Permalink
Merge pull request #70290 from tossmilestone/scheduler-test-refactor
Browse files Browse the repository at this point in the history
Refactor scheduler_test.go to use Clientset
  • Loading branch information
k8s-ci-robot committed Oct 30, 2018
2 parents 070a35f + 12634bf commit 2f175c1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pkg/scheduler/BUILD
Expand Up @@ -54,19 +54,20 @@ go_test(
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],
Expand Down
91 changes: 63 additions & 28 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -27,13 +27,15 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
corelister "k8s.io/client-go/listers/core/v1"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand All @@ -47,7 +49,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)

Expand Down Expand Up @@ -81,12 +82,20 @@ func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
return nil
}

type nodeLister struct {
corelister.NodeLister
}

func (n *nodeLister) List() ([]*v1.Node, error) {
return n.NodeLister.List(labels.Everything())
}

func podWithID(id, desiredHost string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: id,
UID: types.UID(id),
SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id),
SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
},
Spec: v1.PodSpec{
NodeName: desiredHost,
Expand All @@ -100,8 +109,8 @@ func deletingPod(id string) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: id,
UID: types.UID(id),
SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id),
DeletionTimestamp: &deletionTimestamp,
SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
},
Spec: v1.PodSpec{
NodeName: "",
Expand Down Expand Up @@ -239,6 +248,15 @@ func TestScheduler(t *testing.T) {
},
}

stop := make(chan struct{})
defer close(stop)
client := clientsetfake.NewSimpleClientset(&testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
nl := informerFactory.Core().V1().Nodes().Lister()

informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)

for _, item := range table {
t.Run(item.name, func(t *testing.T) {
var gotError error
Expand All @@ -256,10 +274,8 @@ func TestScheduler(t *testing.T) {
gotAssumedPod = pod
},
},
NodeLister: schedulertesting.FakeNodeLister(
[]*v1.Node{&testNode},
),
Algorithm: item.algo,
NodeLister: &nodeLister{nl},
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
Expand Down Expand Up @@ -317,9 +333,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node})
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node)
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node)

waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
Expand Down Expand Up @@ -375,9 +392,10 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node})
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node)

// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
Expand Down Expand Up @@ -463,11 +481,16 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)

nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node})
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}

scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration)
queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration)

informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)

scheduler.Run()
queuedPodStore.Add(firstPod)
queuedPodStore.Add(conflictPod)
Expand Down Expand Up @@ -495,9 +518,12 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache,
nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {

scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil)
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)

informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)

queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
Expand Down Expand Up @@ -540,7 +566,8 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
})

// create several nodes which cannot schedule the above pod
nodes := []*v1.Node{}
var nodes []*v1.Node
var objects []runtime.Object
for i := 0; i < 100; i++ {
uid := fmt.Sprintf("machine%v", i)
node := v1.Node{
Expand All @@ -559,8 +586,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
}
scache.AddNode(&node)
nodes = append(nodes, &node)
objects = append(objects, &node)
}
nodeLister := schedulertesting.FakeNodeLister(nodes)
client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{
"PodFitsResources": predicates.PodFitsResources,
}
Expand All @@ -573,7 +602,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
}
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil)
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)

informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)

queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne()
Expand All @@ -597,7 +629,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {

// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler(
scache,
nil,
Expand All @@ -608,8 +640,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
algorithm.EmptyPriorityMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
false,
api.DefaultPercentageOfNodesToScore)
Expand All @@ -618,7 +650,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
configurator := &FakeConfigurator{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
Expand Down Expand Up @@ -648,7 +680,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
return sched, bindingChan, errChan
}

func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := core.NewGenericScheduler(
scache,
nil,
Expand All @@ -659,16 +691,16 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyPriorityMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
false,
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &factory.Config{
SchedulerCache: scache,
NodeLister: nodeLister,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
Expand Down Expand Up @@ -701,18 +733,21 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc

func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode})
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
queuedPodStore.Add(podWithID("foo", ""))
scache := schedulerinternalcache.New(10*time.Minute, stop)
scache.AddNode(&testNode)
client := clientsetfake.NewSimpleClientset(&testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)

predicateMap := map[string]algorithm.FitPredicate{
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}

recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, recorder)
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.config.VolumeBinder = fakeVolumeBinder
return s, bindingChan, errChan
}
Expand Down

0 comments on commit 2f175c1

Please sign in to comment.