Skip to content

Commit

Permalink
Merge branch 'dev' into 'master'
Browse files Browse the repository at this point in the history
update

1. 修改了SelectNode的返回结果,增加了remain部分;
2. 修改了AveagePlan的名字(改成小写);
3. 增加complexscheduler处理public的代码;

See merge request !6
  • Loading branch information
tonic committed Jul 15, 2016
2 parents 4195575 + abb021d commit eae0ba7
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 38 deletions.
13 changes: 7 additions & 6 deletions cluster/calcium/create_container.go
Expand Up @@ -78,11 +78,12 @@ func (c *calcium) prepareNodes(podname string, quota float64, num int) (map[stri
c.Lock()
defer c.Unlock()

r := make(map[string][]types.CPUMap)
result := make(map[string][]types.CPUMap)
remain := make(map[string]types.CPUMap)

nodes, err := c.ListPodNodes(podname)
if err != nil {
return r, err
return result, err
}

// if public, use only public nodes
Expand All @@ -93,9 +94,9 @@ func (c *calcium) prepareNodes(podname string, quota float64, num int) (map[stri
}

cpumap := makeCPUMap(nodes)
r, err = c.scheduler.SelectNodes(cpumap, quota, num) // 这个接口统一使用float64了
result, remain, err = c.scheduler.SelectNodes(cpumap, quota, num) // 这个接口统一使用float64了
if err != nil {
return r, err
return result, err
}

// if quota is set to 0
Expand All @@ -105,13 +106,13 @@ func (c *calcium) prepareNodes(podname string, quota float64, num int) (map[stri
// update data to etcd
// `SelectNodes` reduces count in cpumap
for _, node := range nodes {
node.CPU = cpumap[node.Name]
node.CPU = remain[node.Name]
// ignore error
c.store.UpdateNode(node)
}
}

return r, err
return result, err
}

// filter nodes
Expand Down
17 changes: 16 additions & 1 deletion scheduler/complex/cpu.go
Expand Up @@ -190,7 +190,7 @@ func abs(a int) int {
return a
}

func AveragePlan(cpu float64, nodes map[string]types.CPUMap, need, maxShareCore, coreShare int) map[string][]types.CPUMap {
func averagePlan(cpu float64, nodes map[string]types.CPUMap, need, maxShareCore, coreShare int) map[string][]types.CPUMap {

var nodecontainer = map[string][]types.CPUMap{}
var result = map[string][]types.CPUMap{}
Expand All @@ -200,6 +200,21 @@ func AveragePlan(cpu float64, nodes map[string]types.CPUMap, need, maxShareCore,
var nodeinfo ByNCon
var nodename string

if cpu < 0.01 {
resultLength := 0
r:
for {
for nodename, _ := range nodes {
result[nodename] = append(result[nodename], nil)
resultLength++
}
if resultLength == need {
break r
}
}
return result
}

for node, cpuInfo := range nodes {
host = newHost(cpuInfo, coreShare)
plan = host.GetContainerCores(cpu, maxShareCore)
Expand Down
18 changes: 13 additions & 5 deletions scheduler/complex/potassium.go
Expand Up @@ -51,21 +51,29 @@ func (m *potassium) RandomNode(nodes map[string]types.CPUMap) (string, error) {
return nodename, nil
}

func (m *potassium) SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, error) {
func (m *potassium) SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) {
m.Lock()
defer m.Unlock()

result := make(map[string][]types.CPUMap)
remain := make(map[string]types.CPUMap)

if len(nodes) == 0 {
return result, fmt.Errorf("No nodes provide to choose some")
return result, nil, fmt.Errorf("No nodes provide to choose some")
}

// all core could be shared
// suppose each core has 10 coreShare
// TODO: change it to be control by parameters
result = AveragePlan(quota, nodes, num, -1, 10)
result = averagePlan(quota, nodes, num, -1, 10)
if result == nil {
return nil, fmt.Errorf("Not enough resource")
return nil, nil, fmt.Errorf("Not enough resource")
}

for node, cpumap := range nodes {
if _, ok := result[node]; !ok {
remain[node] = cpumap
}
}
return result, nil
return result, remain, nil
}
24 changes: 14 additions & 10 deletions scheduler/complex/potassium_test.go
Expand Up @@ -31,7 +31,7 @@ func TestSelectNodes(t *testing.T) {
t.Fatalf("cannot create Potassim instance.", merr)
}

_, err := k.SelectNodes(map[string]types.CPUMap{}, 1, 1)
_, _, err := k.SelectNodes(map[string]types.CPUMap{}, 1, 1)
assert.Error(t, err)
assert.Equal(t, err.Error(), "No nodes provide to choose some")

Expand All @@ -46,20 +46,21 @@ func TestSelectNodes(t *testing.T) {
},
}

_, err = k.SelectNodes(nodes, 2, 3)
_, _, err = k.SelectNodes(nodes, 2, 3)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

_, err = k.SelectNodes(nodes, 3, 2)
_, _, err = k.SelectNodes(nodes, 3, 2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

_, err = k.SelectNodes(nodes, 1, 5)
_, _, err = k.SelectNodes(nodes, 1, 5)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

r, err := k.SelectNodes(nodes, 1, 2)
r, re, err := k.SelectNodes(nodes, 1, 2)
assert.NoError(t, err)
assert.Equal(t, 1, len(re))

for nodename, cpus := range r {
assert.Contains(t, []string{"node1", "node2"}, nodename)
Expand All @@ -69,7 +70,7 @@ func TestSelectNodes(t *testing.T) {
assert.Equal(t, cpu.Total(), 10)
}

r, err = k.SelectNodes(nodes, 1.3, 1)
r, re, err = k.SelectNodes(nodes, 1.3, 1)
assert.NoError(t, err)

for nodename, cpus := range r {
Expand Down Expand Up @@ -152,28 +153,31 @@ func TestComplexNodes(t *testing.T) {
k, _ = NewPotassim(coreCfg)

// test1
res1, err := k.SelectNodes(nodes, 1.7, 7)
res1, rem1, err := k.SelectNodes(nodes, 1.7, 7)
if err != nil {
t.Fatalf("sth wrong")
}
if check := checkAvgPlan(res1, 1, 2, "res1"); check != nil {
t.Fatalf("something went wrong")
}
assert.Equal(t, len(rem1), 1)

// test2
res2, err := k.SelectNodes(nodes, 1.7, 11)
res2, rem2, err := k.SelectNodes(nodes, 1.7, 11)
if check := checkAvgPlan(res2, 1, 4, "res2"); check != nil {
t.Fatalf("something went wrong")
}
assert.Equal(t, len(rem2), 1)

// test3
res3, err := k.SelectNodes(nodes, 1.7, 23)
res3, rem3, err := k.SelectNodes(nodes, 1.7, 23)
if check := checkAvgPlan(res3, 2, 6, "res3"); check != nil {
t.Fatalf("something went wrong")
}
assert.Equal(t, len(rem3), 0)

// test4
_, newErr := k.SelectNodes(nodes, 1.6, 29)
_, _, newErr := k.SelectNodes(nodes, 1.6, 29)
if newErr == nil {
t.Fatalf("how to alloc 29 containers when you only have 28?")
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Expand Up @@ -11,5 +11,5 @@ type Scheduler interface {
RandomNode(nodes map[string]types.CPUMap) (string, error)
// select nodes from nodes, return a list of nodenames and the corresponding cpumap
// quota and number must be given, typically used to determine where to deploy
SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, error)
SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, map[string]types.CPUMap, error)
}
2 changes: 1 addition & 1 deletion scheduler/scheduler_test.go
Expand Up @@ -31,6 +31,6 @@ func TestSchedulerInvoke(t *testing.T) {
},
}

_, err := scheduler.SelectNodes(nodes, 1, 2)
_, _, err := scheduler.SelectNodes(nodes, 1, 2)
assert.NoError(t, err)
}
15 changes: 11 additions & 4 deletions scheduler/simple/magnesium.go
Expand Up @@ -40,20 +40,21 @@ func (m *magnesium) RandomNode(nodes map[string]types.CPUMap) (string, error) {
// Select nodes for deploying.
// Use round robin method to select, in order to make scheduler average.
// TODO Outside this method, caller should update corresponding nodes with `nodes` as their CPU, which is weird
func (m *magnesium) SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, error) {
func (m *magnesium) SelectNodes(nodes map[string]types.CPUMap, quota float64, num int) (map[string][]types.CPUMap, map[string]types.CPUMap, error) {
m.Lock()
defer m.Unlock()

q := int(quota) // 为了和complexscheduler的接口保持一致,quota改为float64
result := make(map[string][]types.CPUMap)
remain := make(map[string]types.CPUMap)
if len(nodes) == 0 {
return result, fmt.Errorf("No nodes provide to choose some")
return result, nil, fmt.Errorf("No nodes provide to choose some")
}

if q > 0 {
total := totalQuota(nodes)
if total < num*q {
return result, fmt.Errorf("Not enough CPUs, total: %d, require: %d", total, num)
return result, nil, fmt.Errorf("Not enough CPUs, total: %d, require: %d", total, num)
}
}

Expand All @@ -70,7 +71,13 @@ done:
}
}
}
return result, nil

for nodename, cpumap := range nodes {
if _, ok := result[nodename]; !ok {
remain[nodename] = cpumap
}
}
return result, remain, nil
}

// count result length
Expand Down
22 changes: 12 additions & 10 deletions scheduler/simple/magnesium_test.go
Expand Up @@ -42,7 +42,7 @@ func TestRandomNode(t *testing.T) {

func TestSelectNodes(t *testing.T) {
m := &magnesium{}
_, err := m.SelectNodes(map[string]types.CPUMap{}, 1, 1)
_, _, err := m.SelectNodes(map[string]types.CPUMap{}, 1, 1)
assert.Error(t, err)
assert.Equal(t, err.Error(), "No nodes provide to choose some")

Expand All @@ -57,20 +57,21 @@ func TestSelectNodes(t *testing.T) {
},
}

_, err = m.SelectNodes(nodes, 2, 3)
_, _, err = m.SelectNodes(nodes, 2, 3)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

_, err = m.SelectNodes(nodes, 3, 2)
_, _, err = m.SelectNodes(nodes, 3, 2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

_, err = m.SelectNodes(nodes, 1, 5)
_, _, err = m.SelectNodes(nodes, 1, 5)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

r, err := m.SelectNodes(nodes, 1, 2)
r, re, err := m.SelectNodes(nodes, 1, 2)
assert.NoError(t, err)
assert.Equal(t, len(re), 0)
for nodename, cpus := range r {
assert.Contains(t, []string{"node1", "node2"}, nodename)
assert.Equal(t, len(cpus), 1)
Expand All @@ -79,11 +80,11 @@ func TestSelectNodes(t *testing.T) {
assert.Equal(t, cpu.Total(), 10)
}

_, err = m.SelectNodes(nodes, 1, 4)
_, _, err = m.SelectNodes(nodes, 1, 4)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

r, err = m.SelectNodes(nodes, 1, 2)
r, _, err = m.SelectNodes(nodes, 1, 2)
assert.NoError(t, err)
for nodename, cpus := range r {
assert.Contains(t, []string{"node1", "node2"}, nodename)
Expand All @@ -93,7 +94,7 @@ func TestSelectNodes(t *testing.T) {
assert.Equal(t, cpu.Total(), 10)
}

_, err = m.SelectNodes(nodes, 1, 1)
_, _, err = m.SelectNodes(nodes, 1, 1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Not enough")

Expand Down Expand Up @@ -134,7 +135,7 @@ func TestTotalQuota(t *testing.T) {

func TestSelectPublicNodes(t *testing.T) {
m := &magnesium{}
_, err := m.SelectNodes(map[string]types.CPUMap{}, 1, 1)
_, _, err := m.SelectNodes(map[string]types.CPUMap{}, 1, 1)
assert.Error(t, err)
assert.Equal(t, err.Error(), "No nodes provide to choose some")

Expand All @@ -149,9 +150,10 @@ func TestSelectPublicNodes(t *testing.T) {
},
}

r, err := m.SelectNodes(nodes, 0, 10)
r, re, err := m.SelectNodes(nodes, 0, 10)
assert.NoError(t, err)
assert.Equal(t, resultLength(r), 10)
assert.Equal(t, len(re), 0)
for nodename, cpus := range r {
assert.Contains(t, []string{"node1", "node2"}, nodename)
for _, cpu := range cpus {
Expand Down

0 comments on commit eae0ba7

Please sign in to comment.