Skip to content

Commit

Permalink
realloc cpu prior container
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Aug 12, 2017
1 parent 10d502d commit d3b343b
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 77 deletions.
48 changes: 21 additions & 27 deletions cluster/calcium/create_container.go
Expand Up @@ -21,10 +21,9 @@ import (
)

const (
MEMORY_PRIOR = "cpuperiod"
CPU_PRIOR = "scheduler"

CPU_SCHEDULER = "CPU"
MEMORY_PRIOR = "MEM"
CPU_PRIOR = "CPU"
RESTART_ALWAYS = "always"
)

// Create Container
Expand All @@ -35,7 +34,7 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions)
if err != nil {
return nil, err
}
if pod.Scheduler == CPU_SCHEDULER {
if pod.Scheduler == CPU_PRIOR {
return c.createContainerWithCPUPrior(specs, opts)
}
log.Infof("Creating container with options: %v", opts)
Expand Down Expand Up @@ -282,7 +281,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.

for i, quota := range cpuMap {
// create options
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+index, nil, specs, opts, node, CPU_PRIOR)
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+index, quota, specs, opts, node, CPU_PRIOR)
ms[i].ContainerName = containerName
ms[i].Podname = opts.Podname
ms[i].Nodename = node.Name
Expand Down Expand Up @@ -386,7 +385,7 @@ func (c *calcium) releaseQuota(node *types.Node, quota types.CPUMap) {
c.store.UpdateNodeCPU(node.Podname, node.Name, quota, "+")
}

func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs types.Specs, opts *types.DeployOptions, node *types.Node, optionMode string) (
func (c *calcium) makeContainerOptions(index int, quota types.CPUMap, specs types.Specs, opts *types.DeployOptions, node *types.Node, optionMode string) (
*enginecontainer.Config,
*enginecontainer.HostConfig,
*enginenetwork.NetworkingConfig,
Expand All @@ -409,24 +408,6 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty
slices := utils.MakeCommandLineArgs(entry.Command + " " + opts.ExtraArgs)
cmd := engineslice.StrSlice(slices)

// calculate CPUShares and CPUSet
// scheduler won't return more than 1 share quota
// so the smallest share is the share numerator
var cpuShares int64
var cpuSetCpus string
if optionMode == "scheduler" {
shareQuota := 10
labels := []string{}
for label, share := range quota {
labels = append(labels, label)
if share < shareQuota {
shareQuota = share
}
}
cpuShares = int64(float64(shareQuota) / float64(10) * float64(1024))
cpuSetCpus = strings.Join(labels, ",")
}

// env
nodeIP := node.GetIP()
env := append(opts.Env, fmt.Sprintf("APP_NAME=%s", specs.Appname))
Expand Down Expand Up @@ -547,7 +528,20 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty
}

var resource enginecontainer.Resources
if optionMode == "scheduler" {
if optionMode == CPU_PRIOR {
// calculate CPUShares and CPUSet
// scheduler won't return more than 1 share quota
// so the smallest share is the share numerator
var shareQuota int64 = 10
labels := []string{}
for label, share := range quota {
labels = append(labels, label)
if share < shareQuota {
shareQuota = share
}
}
cpuShares := int64(float64(shareQuota) / float64(10) * float64(1024))
cpuSetCpus := strings.Join(labels, ",")
resource = enginecontainer.Resources{
CPUShares: cpuShares,
CpusetCpus: cpuSetCpus,
Expand All @@ -565,7 +559,7 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty

restartPolicy := entry.RestartPolicy
maximumRetryCount := 3
if restartPolicy == "always" {
if restartPolicy == RESTART_ALWAYS {
maximumRetryCount = 0
}
hostConfig := &enginecontainer.HostConfig{
Expand Down
240 changes: 191 additions & 49 deletions cluster/calcium/realloc.go
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"fmt"
"strings"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -11,7 +12,7 @@ import (
"gitlab.ricebook.net/platform/core/utils"
)

func (c *calcium) ReAllocResource(ids []string, cpu float64, mem int64) (chan *types.ReAllocResourceMessage, error) {
func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error) {
containers, err := c.store.GetContainers(ids)
if err != nil {
return nil, err
Expand All @@ -22,22 +23,22 @@ func (c *calcium) ReAllocResource(ids []string, cpu float64, mem int64) (chan *t
meta[container.Podname][container.Nodename] = append(meta[container.Podname][container.Nodename], container)
}

ch := make(chan *types.ReAllocResourceMessage)
ch := make(chan *types.ReallocResourceMessage)
for podname, containers := range meta {
pod, err := c.store.GetPod(podname)
if err != nil {
return ch, err
}
if pod.Scheduler == CPU_SCHEDULER {
go c.updateContainersWithCPUPrior(ch, podname, containers, cpu, mem)
if pod.Scheduler == CPU_PRIOR {
go c.reallocContainersWithCPUPrior(ch, podname, containers, cpu, mem)
continue
}
go c.updateContainerWithMemoryPrior(ch, podname, containers, cpu, mem)
go c.reallocContainerWithMemoryPrior(ch, podname, containers, cpu, mem)
}
return ch, nil
}

func (c *calcium) checkMemoryResource(podname, nodename string, mem int64, count int) (*types.Node, error) {
func (c *calcium) reallocNodeMemory(podname, nodename string, mem int64, count int) (*types.Node, error) {
lock, err := c.Lock(podname, 30)
if err != nil {
return nil, err
Expand All @@ -59,68 +60,209 @@ func (c *calcium) checkMemoryResource(podname, nodename string, mem int64, count
return node, nil
}

func (c *calcium) updateContainerWithMemoryPrior(
ch chan *types.ReAllocResourceMessage,
func (c *calcium) reallocContainerWithMemoryPrior(
ch chan *types.ReallocResourceMessage,
podname string,
containers map[string][]*types.Container, cpu float64, mem int64) {
nodeContainers map[string][]*types.Container, cpu float64, mem int64) {

for nodename, containerList := range containers {
node, err := c.checkMemoryResource(podname, nodename, mem, len(containerList))
for nodename, containers := range nodeContainers {
node, err := c.reallocNodeMemory(podname, nodename, mem, len(containers))
if err != nil {
log.Errorf("[realloc] get node failed %v", err)
continue
}
go c.doUpdateContainerWithMemoryPrior(ch, podname, node, containers, cpu, mem)
}
}

for _, container := range containerList {
containerJSON, err := container.Inspect()
if err != nil {
log.Errorf("[realloc] get container failed %v", err)
ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false}
continue
func (c *calcium) doUpdateContainerWithMemoryPrior(
ch chan *types.ReallocResourceMessage,
podname string,
node *types.Node,
containers []*types.Container,
cpu float64, mem int64) {

for _, container := range containers {
containerJSON, err := container.Inspect()
if err != nil {
log.Errorf("[realloc] get container failed %v", err)
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
continue
}

cpuQuota := int64(cpu * float64(utils.CpuPeriodBase))
newCPUQuota := containerJSON.HostConfig.CPUQuota + cpuQuota
newMemory := containerJSON.HostConfig.Memory + mem
if newCPUQuota <= 0 || newMemory <= 0 {
log.Warnf("[relloc] new resource invaild %s, %d, %d", containerJSON.ID, newCPUQuota, newMemory)
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
continue
}

// TODO config timeout
newResource := enginecontainer.Resources{
Memory: newMemory,
MemorySwap: newMemory,
CPUPeriod: utils.CpuPeriodBase,
CPUQuota: newCPUQuota,
}
updateConfig := enginecontainer.UpdateConfig{Resources: newResource}
if err := reSetContainer(containerJSON.ID, node, updateConfig, 10*time.Second); err != nil {
log.Errorf("[realloc] update container failed %v, %s", err, containerJSON.ID)
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
// 如果是增加内存,失败的时候应该把内存还回去
if mem > 0 {
if err := c.store.UpdateNodeMem(podname, node.Name, mem, "+"); err != nil {
log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID)
}
}
cpuQuota := int64(cpu * float64(utils.CpuPeriodBase))
newCPUQuota := containerJSON.HostConfig.CPUQuota + cpuQuota
newMemory := containerJSON.HostConfig.Memory + mem
if newCPUQuota <= 0 || newMemory <= 0 {
log.Warnf("[relloc] new resource invaild %s, %d, %d", containerJSON.ID, newCPUQuota, newMemory)
ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false}
continue
continue
}
// 如果是要降低内存,当执行成功的时候需要把内存还回去
if mem < 0 {
if err := c.store.UpdateNodeMem(podname, node.Name, -mem, "+"); err != nil {
log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID)
}
}
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: true}
}
}

// TODO Async
// TODO config timeout
newResource := enginecontainer.Resources{
Memory: newMemory,
MemorySwap: newMemory,
CPUPeriod: utils.CpuPeriodBase,
CPUQuota: newCPUQuota,
}
updateConfig := enginecontainer.UpdateConfig{Resources: newResource}
if err := reSetContainer(containerJSON.ID, node, updateConfig, 10*time.Second); err != nil {
log.Errorf("[realloc] update container failed %v, %s", err, containerJSON.ID)
ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false}
// 如果是增加内存,失败的时候应该把内存还回去
if mem > 0 {
if err := c.store.UpdateNodeMem(podname, nodename, mem, "+"); err != nil {
log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID)
}
const CPUSHARE_BASE = 10

func calculateCPUUsage(container *types.Container) float64 {
var full, fragment float64
for _, usage := range container.CPU {
if usage == CPUSHARE_BASE {
full += 1.0
continue
}
fragment += float64(usage)
}
return full + fragment/float64(CPUSHARE_BASE)
}

func (c *calcium) reallocNodesCPU(nodesInfoMap map[float64]map[string][]*types.Container, podname string) (map[float64]map[string][]types.CPUMap, error) {
lock, err := c.Lock(podname, 30)
if err != nil {
return nil, err
}
defer lock.Unlock()

// TODO too slow
nodesCPUMap := map[float64]map[string][]types.CPUMap{}
for cpu, nodesInfo := range nodesInfoMap {
for nodename, containers := range nodesInfo {
for _, container := range containers {
// 把 CPU 还回去,变成新的可用资源
// 即便有并发操作,不影响 Create 操作
// 最坏情况就是 CPU 重叠了,可以外部纠正
if err := c.store.UpdateNodeCPU(podname, nodename, container.CPU, "+"); err != nil {
return nil, err
}
}
// 如果是要降低内存,当执行成功的时候需要把内存还回去
if mem < 0 {
if err := c.store.UpdateNodeMem(podname, nodename, -mem, "+"); err != nil {
log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID)
node, err := c.GetNode(podname, nodename)
if err != nil {
return nil, err
}
containersNum := len(containers)
nodesInfo := []types.NodeInfo{
types.NodeInfo{
CPUAndMem: types.CPUAndMem{
CpuMap: node.CPU,
MemCap: 0,
},
Name: nodename,
},
}
result, changed, err := c.scheduler.SelectCPUNodes(nodesInfo, cpu, containersNum)
if err != nil {
return nil, err
}
nodeCPUMap, isChanged := changed[nodename]
containersCPUMap, hasResult := result[nodename]
if isChanged && hasResult {
node.CPU = nodeCPUMap
if err := c.store.UpdateNode(node); err != nil {
return nil, err
}
nodesCPUMap[cpu][nodename] = containersCPUMap
}
ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: true}
}
}
return nodesCPUMap, nil
}

func (c *calcium) updateContainersWithCPUPrior(
ch chan *types.ReAllocResourceMessage,
// mem not used in this prior
func (c *calcium) reallocContainersWithCPUPrior(
ch chan *types.ReallocResourceMessage,
podname string,
containers map[string][]*types.Container, cpu float64, mem int64) {
nodeContainers map[string][]*types.Container, cpu float64, mem int64) {

nodesInfoMap := map[float64]map[string][]*types.Container{}
for nodename, containers := range nodeContainers {
for _, container := range containers {
newCPU := calculateCPUUsage(container) + cpu
if newCPU <= 0.0 {
log.Warnf("[realloc] cpu set below zero")
continue
}
if _, ok := nodesInfoMap[newCPU]; !ok {
nodesInfoMap[newCPU] = map[string][]*types.Container{}
}
nodesInfoMap[newCPU][nodename] = append(nodesInfoMap[newCPU][nodename], container)
}
}
nodesCPUMap, err := c.reallocNodesCPU(nodesInfoMap, podname)
if err != nil {
log.Errorf("[realloc] realloc cpu res failed %v", err)
return
}
for cpu, nodesCPUResult := range nodesCPUMap {
go c.doReallocContainersWithCPUPrior(ch, podname, nodesCPUResult, nodesInfoMap[cpu])
}
}

func (c *calcium) doReallocContainersWithCPUPrior(
ch chan *types.ReallocResourceMessage,
podname string,
nodesCPUResult map[string][]types.CPUMap,
nodesInfoMap map[string][]*types.Container,
) {
for nodename, cpuset := range nodesCPUResult {
node, err := c.GetNode(podname, nodename)
if err != nil {
return
}
containers := nodesInfoMap[nodename]
for index, container := range containers {
// TODO dante 来重构吧,应该能抽出一个公共函数
// TODO 同 create container #543
// TODO 抽出 CPUShare 的配置
var shareQuota int64 = 10
labels := []string{}
for label, share := range cpuset[index] {
labels = append(labels, label)
if share < shareQuota {
shareQuota = share
}
}
cpuShares := int64(float64(shareQuota) / float64(10) * float64(1024))
cpuSetCpus := strings.Join(labels, ",")
resource := enginecontainer.Resources{
CPUShares: cpuShares,
CpusetCpus: cpuSetCpus,
}

updateConfig := enginecontainer.UpdateConfig{Resources: resource}
if err := reSetContainer(container.ID, node, updateConfig, 10*time.Second); err != nil {
log.Errorf("[realloc] update container failed %v", err)
// TODO 这里理论上是可以恢复 CPU 占用表的,一来我们知道新的占用是怎样,二来我们也晓得老的占用是啥样
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
}
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true}
}
}
}

func reSetContainer(ID string, node *types.Node, config enginecontainer.UpdateConfig, timeout time.Duration) error {
Expand Down

0 comments on commit d3b343b

Please sign in to comment.