Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler changes for extensibility #2505

Merged
merged 6 commits into from
Dec 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler.delegate = m.Handler

// Scheduler
schedulerConfigFactory := &factory.ConfigFactory{cl}
schedulerConfig := schedulerConfigFactory.Create()
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}
scheduler.New(schedulerConfig).Run()

endpoints := service.NewEndpointController(cl)
Expand Down
68 changes: 54 additions & 14 deletions pkg/scheduler/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,56 @@ import (
)

type genericScheduler struct {
predicates []FitPredicate
prioritizer PriorityFunction
pods PodLister
random *rand.Rand
randomLock sync.Mutex
predicates []FitPredicate
prioritizers []PriorityConfig
pods PodLister
random *rand.Rand
randomLock sync.Mutex
}

func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
minions, err := minionLister.List()
if err != nil {
return "", err
}
if len(minions.Items) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was happening before here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was being caught towards the end of the function if no minions were found that could fit the predicates. I added this check here to validate that minions existed and were listed prior to running them through the predicates.

return "", fmt.Errorf("no minions available to schedule pods")
}

filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions)
if err != nil {
return "", err
}
priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes))

priorityList, err := prioritizeNodes(pod, g.pods, g.prioritizers, FakeMinionLister(filteredNodes))
if err != nil {
return "", err
}
if len(priorityList) == 0 {
return "", fmt.Errorf("failed to find a fit for pod: %v", pod)
}

return g.selectHost(priorityList)
}

// This method takes a prioritized list of minions and sorts them in reverse order based on scores
// and then picks one randomly from the minions that had the highest score
func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, error) {
if len(priorityList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
sort.Sort(priorityList)
sort.Sort(sort.Reverse(priorityList))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you reverse sort and then forward sort?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks, I had read that but not carefully enough.


hosts := getMinHosts(priorityList)
hosts := getBestHosts(priorityList)
g.randomLock.Lock()
defer g.randomLock.Unlock()

ix := g.random.Int() % len(hosts)
return hosts[ix], nil
}

// Filters the minions to find the ones that fit based on the given predicate functions
// Each minion is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicate, nodes api.MinionList) (api.MinionList, error) {
filtered := []api.Minion{}
machineToPods, err := MapPodsToMachines(podLister)
Expand All @@ -91,7 +101,37 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat
return api.MinionList{Items: filtered}, nil
}

func getMinHosts(list HostPriorityList) []string {
// Prioritizes the minions by running the individual priority functions sequentially.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred minion) and 10 is the highest
// Each priority function can also have its own weight
// The minion scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all minions
func prioritizeNodes(pod api.Pod, podLister PodLister, priorityConfigs []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) {
result := HostPriorityList{}
combinedScores := map[string]int{}
for _, priorityConfig := range priorityConfigs {
weight := priorityConfig.Weight
// skip the priority function if the weight is specified as 0
if weight == 0 {
continue
}
priorityFunc := priorityConfig.Function
prioritizedList, err := priorityFunc(pod, podLister, minionLister)
if err != nil {
return HostPriorityList{}, err
}
for _, hostEntry := range prioritizedList {
combinedScores[hostEntry.host] += hostEntry.score * weight
}
}
for host, score := range combinedScores {
result = append(result, HostPriority{host: host, score: score})
}
return result, nil
}

func getBestHosts(list HostPriorityList) []string {
result := []string{}
for _, hostEntry := range list {
if hostEntry.score == list[0].score {
Expand Down Expand Up @@ -121,11 +161,11 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister)
return result, nil
}

func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler {
func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityConfig, pods PodLister, random *rand.Rand) Scheduler {
return &genericScheduler{
predicates: predicates,
prioritizer: prioritizer,
pods: pods,
random: random,
predicates: predicates,
prioritizers: prioritizers,
pods: pods,
random: random,
}
}
95 changes: 67 additions & 28 deletions pkg/scheduler/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package scheduler

import (
"fmt"
"math"
"math/rand"
"strconv"
"testing"
Expand Down Expand Up @@ -59,6 +60,29 @@ func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister
return result, nil
}

func reverseNumericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []HostPriority{}
result, err := numericPriority(pod, podLister, minionLister)
if err != nil {
return nil, err
}

for _, hostPriority := range result {
maxScore = math.Max(maxScore, float64(hostPriority.score))
minScore = math.Min(minScore, float64(hostPriority.score))
}
for _, hostPriority := range result {
reverseResult = append(reverseResult, HostPriority{
host: hostPriority.host,
score: int(maxScore + minScore - float64(hostPriority.score)),
})
}

return reverseResult, nil
}

func makeMinionList(nodeNames []string) api.MinionList {
result := api.MinionList{
Items: make([]api.Minion, len(nodeNames)),
Expand All @@ -81,28 +105,28 @@ func TestSelectHost(t *testing.T) {
{host: "machine1.1", score: 1},
{host: "machine2.1", score: 2},
},
possibleHosts: util.NewStringSet("machine1.1"),
possibleHosts: util.NewStringSet("machine2.1"),
expectsErr: false,
},
// equal scores
{
list: []HostPriority{
{host: "machine1.1", score: 1},
{host: "machine1.2", score: 1},
{host: "machine1.3", score: 1},
{host: "machine1.2", score: 2},
{host: "machine1.3", score: 2},
{host: "machine2.1", score: 2},
},
possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"),
possibleHosts: util.NewStringSet("machine1.2", "machine1.3", "machine2.1"),
expectsErr: false,
},
// out of order scores
{
list: []HostPriority{
{host: "machine1.1", score: 1},
{host: "machine1.2", score: 1},
{host: "machine1.1", score: 3},
{host: "machine1.2", score: 3},
{host: "machine2.1", score: 2},
{host: "machine3.1", score: 3},
{host: "machine1.3", score: 1},
{host: "machine3.1", score: 1},
{host: "machine1.3", score: 3},
},
possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"),
expectsErr: false,
Expand Down Expand Up @@ -137,58 +161,73 @@ func TestSelectHost(t *testing.T) {

func TestGenericScheduler(t *testing.T) {
tests := []struct {
name string
predicates []FitPredicate
prioritizer PriorityFunction
prioritizers []PriorityConfig
nodes []string
pod api.Pod
expectedHost string
expectsErr bool
}{
{
predicates: []FitPredicate{falsePredicate},
prioritizer: EqualPriority,
nodes: []string{"machine1", "machine2"},
expectsErr: true,
predicates: []FitPredicate{falsePredicate},
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
expectsErr: true,
name: "test 1",
},
{
predicates: []FitPredicate{truePredicate},
prioritizer: EqualPriority,
nodes: []string{"machine1", "machine2"},
// Random choice between both, the rand seeded above with zero, chooses "machine2"
expectedHost: "machine2",
predicates: []FitPredicate{truePredicate},
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
// Random choice between both, the rand seeded above with zero, chooses "machine1"
expectedHost: "machine1",
name: "test 2",
},
{
// Fits on a machine where the pod ID matches the machine name
predicates: []FitPredicate{matchesPredicate},
prioritizer: EqualPriority,
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}},
expectedHost: "machine2",
name: "test 3",
},
{
predicates: []FitPredicate{truePredicate},
prioritizer: numericPriority,
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
expectedHost: "1",
expectedHost: "3",
name: "test 4",
},
{
predicates: []FitPredicate{matchesPredicate},
prioritizer: numericPriority,
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHost: "2",
name: "test 5",
},
{
predicates: []FitPredicate{truePredicate},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}},
nodes: []string{"3", "2", "1"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHost: "1",
name: "test 6",
},
{
predicates: []FitPredicate{truePredicate, falsePredicate},
prioritizer: numericPriority,
nodes: []string{"3", "2", "1"},
expectsErr: true,
predicates: []FitPredicate{truePredicate, falsePredicate},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
expectsErr: true,
name: "test 7",
},
}

for _, test := range tests {
random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister([]api.Pod{}), random)
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, FakePodLister([]api.Pod{}), random)
machine, err := scheduler.Schedule(test.pod, FakeMinionLister(makeMinionList(test.nodes)))
if test.expectsErr {
if err == nil {
Expand All @@ -199,7 +238,7 @@ func TestGenericScheduler(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
if test.expectedHost != machine {
t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine)
t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHost, machine)
}
}
}
Expand Down
32 changes: 22 additions & 10 deletions pkg/scheduler/priorities.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,44 @@ import (
"github.com/golang/glog"
)

func calculatePercentage(requested, capacity int) int {
// the unused capacity is calculated on a scale of 0-10
// 0 being the lowest priority and 10 being the highest
func calculateScore(requested, capacity int, node string) int {
if capacity == 0 {
return 0
}
return (requested * 100) / capacity
if requested > capacity {
glog.Errorf("Combined requested resources from existing pods exceeds capacity on minion: %s", node)
return 0
}
return ((capacity - requested) * 10) / capacity
}

// Calculate the occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node.
func calculateOccupancy(node api.Minion, pods []api.Pod) HostPriority {
func calculateOccupancy(pod api.Pod, node api.Minion, pods []api.Pod) HostPriority {
totalCPU := 0
totalMemory := 0
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
totalCPU += container.CPU
totalMemory += container.Memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, minions.
for _, container := range pod.Spec.Containers {
totalCPU += container.CPU
totalMemory += container.Memory
}

percentageCPU := calculatePercentage(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0))
percentageMemory := calculatePercentage(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0))
glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Percentage:(%d\\%m, %d\\%)", totalCPU, totalMemory, percentageCPU, percentageMemory)
cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0), node.Name)
memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0), node.Name)
glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Score:(%d, %d)", totalCPU, totalMemory, cpuScore, memoryScore)

return HostPriority{
host: node.Name,
score: int((percentageCPU + percentageMemory) / 2),
score: int((cpuScore + memoryScore) / 2),
}
}

Expand All @@ -64,7 +76,7 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio

list := HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateOccupancy(node, podsToMachines[node.Name]))
list = append(list, calculateOccupancy(pod, node, podsToMachines[node.Name]))
}
return list, nil
}