Skip to content

Commit

Permalink
Merge branch 'dev' into 'master'
Browse files Browse the repository at this point in the history
增加内存限制

Update
1. node 增加一个字段 memcap 纪录节点可用的内存(单位 Byte);
2. container 增加一个字段 memory 纪录容器申请了多少内存(单位Byte);
3. 更新了平均分配的算法:先选出哪些节点能满足 cpu 需求(这些节点被称为可用节点), 然后计算每个可用节点的内存可以分配多少容器,再进行分配,分配算法采用了以前 ComplexScheduler 的分配算法(区别在于 ComplexScheduler的分配算法计算每台节点可以分配多少容器的时候是计算 CPU 资源,现在我们计算的是内存资源);
4. 增删容器的时候更新节点的 memcap 字段,增加容器失败的时候会把对应内存重新加回去;
5. 增加了两个测试实例(见 `unit_test.go`),验证了算法的正确性以及在资源足够的情况的随机性(避免同样的容器堆积在同一台服务器上);

See merge request !16
  • Loading branch information
CMGS committed Oct 24, 2016
2 parents 5621ae0 + 3d2c515 commit 194b800
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ core.yaml
core.yml
.ropeproject
*.pyc
default.etcd/*
50 changes: 37 additions & 13 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions)
func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
ch := make(chan *types.CreateContainerMessage)

cpumap, _, err := c.getCPUMap(opts.Podname, opts.Nodename, 1.0)
cpuandmem, _, err := c.getCPUAndMem(opts.Podname, opts.Nodename, 1.0)
if err != nil {
return ch, err
}
nodesInfo := utils.GetNodesInfo(cpumap)
nodesInfo := utils.GetNodesInfo(cpuandmem)

cpuQuota := int(opts.CPUQuota * float64(utils.CpuPeriodBase))
plan, err := utils.AllocContainerPlan(nodesInfo, cpuQuota, opts.Count)
// memory := opts.Memory / 1024 / 1024 // 转换成MB
plan, err := utils.AllocContainerPlan(nodesInfo, cpuQuota, opts.Memory, opts.Count) // 还是以 Bytes 作单位, 不转换了

if err != nil {
return ch, err
}
Expand Down Expand Up @@ -68,6 +70,11 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
ms[i] = &types.CreateContainerMessage{}
}

// 更新 MemCap
// memory := opts.Memory / 1024 / 1024
memoryTotal := opts.Memory * int64(connum)
c.store.UpdateNodeMem(opts.Podname, nodename, memoryTotal, "-")

node, err := c.GetNode(opts.Podname, nodename)
if err != nil {
return ms
Expand All @@ -81,6 +88,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(nil, specs, opts, "cpuperiod", node.GetIP())
if err != nil {
log.Errorf("error when creating CreateContainerOptions, %v", err)
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+") // 创建容器失败就要把资源还回去对不对?
ms[i].Error = err.Error()
continue
}
Expand All @@ -90,6 +98,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
if err != nil {
log.Errorf("error when creating container, %v", err)
ms[i].Error = err.Error()
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
continue
}

Expand All @@ -102,6 +111,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
// need to ensure all networks are correctly connected
for networkID, ipv4 := range opts.Networks {
if err = c.network.ConnectToNetwork(ctx, container.ID, networkID, ipv4); err != nil {
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
log.Errorf("error when connecting container %q to network %q, %q", container.ID, networkID, err.Error())
breaked = true
break
Expand All @@ -119,6 +129,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
// if any break occurs, then this container needs to be removed
if breaked {
ms[i].Error = err.Error()
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{})
continue
}
Expand All @@ -128,6 +139,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
if err != nil {
log.Errorf("error when starting container, %v", err)
ms[i].Error = err.Error()
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
go node.Engine.ContainerRemove(context.Background(), container.ID, enginetypes.ContainerRemoveOptions{})
continue
}
Expand All @@ -140,12 +152,14 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
if err != nil {
log.Errorf("error when inspecting container, %v", err)
ms[i].Error = err.Error()
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
continue
}

_, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, nil)
_, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, nil, opts.Memory)
if err != nil {
ms[i].Error = err.Error()
c.store.UpdateNodeMem(opts.Podname, nodename, opts.Memory, "+")
continue
}

Expand All @@ -157,6 +171,7 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
Error: "",
Success: true,
CPU: nil,
Memory: opts.Memory,
}

}
Expand Down Expand Up @@ -205,16 +220,24 @@ func (c *calcium) createContainerWithScheduler(specs types.Specs, opts *types.De
return ch, nil
}

func makeCPUMap(nodes []*types.Node) map[string]types.CPUMap {
r := make(map[string]types.CPUMap)
func makeCPUAndMem(nodes []*types.Node) map[string]types.CPUAndMem {
r := make(map[string]types.CPUAndMem)
for _, node := range nodes {
r[node.Name] = node.CPU
r[node.Name] = types.CPUAndMem{node.CPU, node.MemCap}
}
return r
}

func makeCPUMap(nodes map[string]types.CPUAndMem) map[string]types.CPUMap {
r := make(map[string]types.CPUMap)
for key, node := range nodes {
r[key] = node.CpuMap
}
return r
}

func (c *calcium) getCPUMap(podname, nodename string, quota float64) (map[string]types.CPUMap, []*types.Node, error) {
result := make(map[string]types.CPUMap)
func (c *calcium) getCPUAndMem(podname, nodename string, quota float64) (map[string]types.CPUAndMem, []*types.Node, error) {
result := make(map[string]types.CPUAndMem)
lock, err := c.store.CreateLock(podname, 30)
if err != nil {
return result, nil, err
Expand Down Expand Up @@ -250,7 +273,7 @@ func (c *calcium) getCPUMap(podname, nodename string, quota float64) (map[string
return result, nil, fmt.Errorf("No available nodes")
}

result = makeCPUMap(nodes)
result = makeCPUAndMem(nodes)
return result, nodes, nil
}

Expand All @@ -259,10 +282,11 @@ func (c *calcium) getCPUMap(podname, nodename string, quota float64) (map[string
func (c *calcium) prepareNodes(podname, nodename string, quota float64, num int) (map[string][]types.CPUMap, error) {
result := make(map[string][]types.CPUMap)

cpumap, nodes, err := c.getCPUMap(podname, nodename, quota)
cpuandmem, nodes, err := c.getCPUAndMem(podname, nodename, quota)
if err != nil {
return result, err
}
cpumap := makeCPUMap(cpuandmem) // 做这个转换,免得改太多
// use podname as lock key to prevent scheduling on the same node at one time
result, changed, err := c.scheduler.SelectNodes(cpumap, quota, num) // 这个接口统一使用float64了
if err != nil {
Expand Down Expand Up @@ -404,7 +428,7 @@ func (c *calcium) doCreateContainerWithScheduler(nodename string, cpumap []types
continue
}

_, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, quota)
_, err = c.store.AddContainer(info.ID, opts.Podname, node.Name, containerName, quota, opts.Memory)
if err != nil {
ms[i].Error = err.Error()
c.releaseQuota(node, quota)
Expand Down Expand Up @@ -792,7 +816,7 @@ func (c *calcium) doUpgradeContainer(containers []*types.Container, image string
}

// if so, add a new container in etcd
_, err = c.store.AddContainer(newInfo.ID, container.Podname, container.Nodename, containerName, container.CPU)
_, err = c.store.AddContainer(newInfo.ID, container.Podname, container.Nodename, containerName, container.CPU, container.Memory)
if err != nil {
ms[i].Error = err.Error()
go engine.ContainerRemove(context.Background(), newContainer.ID, enginetypes.ContainerRemoveOptions{})
Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/remove_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,6 @@ func (c *calcium) removeOneContainer(container *types.Container) error {
return err
}
}
c.store.UpdateNodeMem(node.Podname, node.Name, container.Memory, "+")
return c.store.RemoveContainer(info.ID)
}
4 changes: 2 additions & 2 deletions devtools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ def create_container(ctx):
podname='dev',
entrypoint='web',
cpu_quota=1,
count=2,
memory=0,
count=1,
memory=50*1024*1024,
networks={'zzz': ''}, # 如果不需要指定IP就写空字符串, 写其他的错误的格式会报错失败
env=['ENV_A=1', 'ENV_B=2'])

Expand Down
3 changes: 2 additions & 1 deletion store/etcd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (k *krypton) GetContainers(ids []string) ([]*types.Container, error) {
// actually if we already know its node, we will know its pod
// but we still store it
// storage path in etcd is `/eru-core/container/:containerid`
func (k *krypton) AddContainer(id, podname, nodename, name string, cpu types.CPUMap) (*types.Container, error) {
func (k *krypton) AddContainer(id, podname, nodename, name string, cpu types.CPUMap, memory int64) (*types.Container, error) {
// first we check if node really exists
node, err := k.GetNode(podname, nodename)
if err != nil {
Expand All @@ -82,6 +82,7 @@ func (k *krypton) AddContainer(id, podname, nodename, name string, cpu types.CPU
Nodename: nodename,
Name: name,
CPU: cpu,
Memory: memory,
Engine: node.Engine,
}

Expand Down
49 changes: 49 additions & 0 deletions store/etcd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"golang.org/x/net/context"
)

const GIGABYTE = 1073741824

// get a node from etcd
// and construct it's docker client
// a node must belong to a pod
Expand Down Expand Up @@ -103,12 +105,15 @@ func (k *krypton) AddNode(name, endpoint, podname, cafile, certfile, keyfile str
cpumap[strconv.Itoa(i)] = 10
}

memcap := info.MemTotal - GIGABYTE // 可用内存为总内存减 1G

node := &types.Node{
Name: name,
Endpoint: endpoint,
Podname: podname,
Public: public,
CPU: cpumap,
MemCap: memcap,
Engine: engine,
}

Expand Down Expand Up @@ -212,6 +217,50 @@ func (k *krypton) UpdateNode(node *types.Node) error {
return nil
}

func (k *krypton) UpdateNodeMem(podname, nodename string, mem int64, action string) error {
lock, err := k.CreateLock(fmt.Sprintf("%s_%s", podname, nodename), 30)
if err != nil {
return err
}

if err := lock.Lock(); err != nil {
return err
}
defer lock.Unlock()

nodeKey := fmt.Sprintf(nodeInfoKey, podname, nodename)
resp, err := k.etcd.Get(context.Background(), nodeKey, nil)
if err != nil {
return err
}
if resp.Node.Dir {
return fmt.Errorf("Node storage path %q in etcd is a directory", nodeKey)
}
node := &types.Node{}
if err := json.Unmarshal([]byte(resp.Node.Value), node); err != nil {
return err
}

if action == "add" || action == "+" {
node.MemCap += mem
} else if action == "sub" || action == "-" {
node.MemCap -= mem
}

bytes, err := json.Marshal(node)
if err != nil {
return err
}

log.Debugf(string(bytes))
_, err = k.etcd.Set(context.Background(), nodeKey, string(bytes), nil)
if err != nil {
return err
}

return nil
}

// update cpu on a node, either add or substract
// need to lock
func (k *krypton) UpdateNodeCPU(podname, nodename string, cpu types.CPUMap, action string) error {
Expand Down
9 changes: 7 additions & 2 deletions store/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (m *MockStore) UpdateNodeCPU(podname, nodename string, cpu types.CPUMap, ac
return args.Error(0)
}

func (m *MockStore) UpdateNodeMem(podname, nodename string, memory int64, action string) error {
args := m.Called(podname, nodename, memory, action)
return args.Error(0)
}

func (m *MockStore) GetContainer(id string) (*types.Container, error) {
args := m.Called(id)
if args.Get(0) != nil {
Expand All @@ -94,8 +99,8 @@ func (m *MockStore) GetContainers(ids []string) ([]*types.Container, error) {
return args.Get(0).([]*types.Container), args.Error(1)
}

func (m *MockStore) AddContainer(id, podname, nodename, name string, cpu types.CPUMap) (*types.Container, error) {
args := m.Called(id, podname, nodename, name, cpu)
func (m *MockStore) AddContainer(id, podname, nodename, name string, cpu types.CPUMap, memory int64) (*types.Container, error) {
args := m.Called(id, podname, nodename, name, cpu, memory)
if args.Get(0) != nil {
return args.Get(0).(*types.Container), args.Error(1)
}
Expand Down
3 changes: 2 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type Store interface {
GetAllNodes() ([]*types.Node, error)
UpdateNode(*types.Node) error
UpdateNodeCPU(podname, nodename string, cpu types.CPUMap, action string) error
UpdateNodeMem(podname, nodename string, mem int64, action string) error

// container
AddContainer(id, podname, nodename, name string, cpu types.CPUMap) (*types.Container, error)
AddContainer(id, podname, nodename, name string, cpu types.CPUMap, mem int64) (*types.Container, error)
GetContainer(id string) (*types.Container, error)
GetContainers(ids []string) ([]*types.Container, error)
RemoveContainer(id string) error
Expand Down
1 change: 1 addition & 0 deletions types/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Container struct {
Nodename string `json:"nodename"`
Name string `json:"name"`
CPU CPUMap `json:"cpu"`
Memory int64 `json:"memory"`
Engine *engineapi.Client `json:"-"`
}

Expand Down
1 change: 1 addition & 0 deletions types/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type CreateContainerMessage struct {
Error string
Success bool
CPU CPUMap
Memory int64
}

type PullImageMessage struct {
Expand Down
6 changes: 6 additions & 0 deletions types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (

type CPUMap map[string]int

type CPUAndMem struct {
CpuMap CPUMap
MemCap int64
}

// total quotas
func (c CPUMap) Total() int {
count := 0
Expand Down Expand Up @@ -48,6 +53,7 @@ type Node struct {
Podname string `json:"podname"`
Public bool `json:"public"`
CPU CPUMap `json:"cpu"`
MemCap int64 `json:"memcap"`
Engine *engineapi.Client `json:"-"`
}

Expand Down
Loading

0 comments on commit 194b800

Please sign in to comment.