/
resource.go
137 lines (121 loc) · 3.76 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package calcium
import (
"context"
"sort"
"github.com/sanity-io/litter"
log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/scheduler"
"github.com/projecteru2/core/types"
)
func (c *Calcium) allocResource(ctx context.Context, opts *types.DeployOptions, podType string) ([]types.NodeInfo, error) {
var err error
var total int
var nodesInfo []types.NodeInfo
var nodeCPUPlans map[string][]types.CPUMap
lock, err := c.Lock(ctx, opts.Podname, c.config.LockTimeout)
if err != nil {
return nil, err
}
defer lock.Unlock(ctx)
cpuandmem, err := c.getCPUAndMem(ctx, opts.Podname, opts.Nodename, opts.NodeLabels)
if err != nil {
return nil, err
}
nodesInfo = getNodesInfo(cpuandmem)
// 载入之前部署的情况
nodesInfo, err = c.store.MakeDeployStatus(ctx, opts, nodesInfo)
if err != nil {
return nil, err
}
switch podType {
case scheduler.MEMORY_PRIOR:
log.Debugf("[allocResource] Input opts.CPUQuota: %f", opts.CPUQuota)
nodesInfo, total, err = c.scheduler.SelectMemoryNodes(nodesInfo, opts.CPUQuota, opts.Memory) // 还是以 Bytes 作单位, 不转换了
case scheduler.CPU_PRIOR:
nodesInfo, nodeCPUPlans, total, err = c.scheduler.SelectCPUNodes(nodesInfo, opts.CPUQuota, opts.Memory)
default:
return nil, types.ErrBadPodType
}
if err != nil {
return nil, err
}
switch opts.DeployMethod {
case cluster.DeployAuto:
nodesInfo, err = c.scheduler.CommonDivision(nodesInfo, opts.Count, total)
case cluster.DeployEach:
nodesInfo, err = c.scheduler.EachDivision(nodesInfo, opts.Count, opts.NodesLimit)
case cluster.DeployFill:
nodesInfo, err = c.scheduler.FillDivision(nodesInfo, opts.Count, opts.NodesLimit)
default:
return nil, types.ErrBadDeployMethod
}
if err != nil {
return nil, err
}
// 资源处理
sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].Deploy < nodesInfo[j].Deploy })
p := sort.Search(len(nodesInfo), func(i int) bool { return nodesInfo[i].Deploy > 0 })
// p 最大也就是 len(nodesInfo) - 1
if p == len(nodesInfo) {
return nil, types.ErrInsufficientRes
}
nodesInfo = nodesInfo[p:]
for i, nodeInfo := range nodesInfo {
cpuCost := types.CPUMap{}
memoryCost := opts.Memory * int64(nodeInfo.Deploy)
if _, ok := nodeCPUPlans[nodeInfo.Name]; ok {
cpuList := nodeCPUPlans[nodeInfo.Name][:nodeInfo.Deploy]
nodesInfo[i].CPUPlan = cpuList
for _, cpu := range cpuList {
cpuCost.Add(cpu)
}
}
if err := c.store.UpdateNodeResource(ctx, opts.Podname, nodeInfo.Name, cpuCost, memoryCost, "-"); err != nil {
return nil, err
}
}
go func() {
log.Info("[allocResource] result")
for _, nodeInfo := range nodesInfo {
s := litter.Sdump(nodeInfo.CPUPlan)
log.Infof("[allocResource] deploy %d to %s \n%s", nodeInfo.Deploy, nodeInfo.Name, s)
}
}()
return nodesInfo, c.bindProcessStatus(ctx, opts, nodesInfo)
}
func (c *Calcium) bindProcessStatus(ctx context.Context, opts *types.DeployOptions, nodesInfo []types.NodeInfo) error {
for _, nodeInfo := range nodesInfo {
if err := c.store.SaveProcessing(ctx, opts, nodeInfo); err != nil {
return err
}
}
return nil
}
func (c *Calcium) getCPUAndMem(ctx context.Context, podname, nodename string, labels map[string]string) (map[*types.Node]types.CPUAndMem, error) {
var nodes []*types.Node
var err error
if nodename == "" {
nodes, err = c.ListPodNodes(ctx, podname, false)
if err != nil {
return nil, err
}
nodeList := []*types.Node{}
for _, node := range nodes {
if filterNode(node, labels) {
nodeList = append(nodeList, node)
}
}
nodes = nodeList
} else {
n, err := c.GetNode(ctx, podname, nodename)
if err != nil {
return nil, err
}
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return nil, types.ErrInsufficientNodes
}
return makeCPUAndMem(nodes), nil
}