diff --git a/cluster/calcium/create_container.go b/cluster/calcium/create_container.go index 511a893a6..c411102e4 100644 --- a/cluster/calcium/create_container.go +++ b/cluster/calcium/create_container.go @@ -21,10 +21,9 @@ import ( ) const ( - MEMORY_PRIOR = "cpuperiod" - CPU_PRIOR = "scheduler" - - CPU_SCHEDULER = "CPU" + MEMORY_PRIOR = "MEM" + CPU_PRIOR = "CPU" + RESTART_ALWAYS = "always" ) // Create Container @@ -35,7 +34,7 @@ func (c *calcium) CreateContainer(specs types.Specs, opts *types.DeployOptions) if err != nil { return nil, err } - if pod.Scheduler == CPU_SCHEDULER { + if pod.Scheduler == CPU_PRIOR { return c.createContainerWithCPUPrior(specs, opts) } log.Infof("Creating container with options: %v", opts) @@ -282,7 +281,7 @@ func (c *calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types. for i, quota := range cpuMap { // create options - config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+index, nil, specs, opts, node, CPU_PRIOR) + config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(i+index, quota, specs, opts, node, CPU_PRIOR) ms[i].ContainerName = containerName ms[i].Podname = opts.Podname ms[i].Nodename = node.Name @@ -386,7 +385,7 @@ func (c *calcium) releaseQuota(node *types.Node, quota types.CPUMap) { c.store.UpdateNodeCPU(node.Podname, node.Name, quota, "+") } -func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs types.Specs, opts *types.DeployOptions, node *types.Node, optionMode string) ( +func (c *calcium) makeContainerOptions(index int, quota types.CPUMap, specs types.Specs, opts *types.DeployOptions, node *types.Node, optionMode string) ( *enginecontainer.Config, *enginecontainer.HostConfig, *enginenetwork.NetworkingConfig, @@ -409,24 +408,6 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty slices := utils.MakeCommandLineArgs(entry.Command + " " + opts.ExtraArgs) cmd := engineslice.StrSlice(slices) - // calculate CPUShares and CPUSet - // scheduler won't return more than 1 share quota - // so the smallest share is the share numerator - var cpuShares int64 - var cpuSetCpus string - if optionMode == "scheduler" { - shareQuota := 10 - labels := []string{} - for label, share := range quota { - labels = append(labels, label) - if share < shareQuota { - shareQuota = share - } - } - cpuShares = int64(float64(shareQuota) / float64(10) * float64(1024)) - cpuSetCpus = strings.Join(labels, ",") - } - // env nodeIP := node.GetIP() env := append(opts.Env, fmt.Sprintf("APP_NAME=%s", specs.Appname)) @@ -547,7 +528,20 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty } var resource enginecontainer.Resources - if optionMode == "scheduler" { + if optionMode == CPU_PRIOR { + // calculate CPUShares and CPUSet + // scheduler won't return more than 1 share quota + // so the smallest share is the share numerator + var shareQuota int64 = 10 + labels := []string{} + for label, share := range quota { + labels = append(labels, label) + if share < shareQuota { + shareQuota = share + } + } + cpuShares := int64(float64(shareQuota) / float64(10) * float64(1024)) + cpuSetCpus := strings.Join(labels, ",") resource = enginecontainer.Resources{ CPUShares: cpuShares, CpusetCpus: cpuSetCpus, @@ -565,7 +559,7 @@ func (c *calcium) makeContainerOptions(index int, quota map[string]int, specs ty restartPolicy := entry.RestartPolicy maximumRetryCount := 3 - if restartPolicy == "always" { + if restartPolicy == RESTART_ALWAYS { maximumRetryCount = 0 } hostConfig := &enginecontainer.HostConfig{ diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 104bffc5f..8846538a6 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -3,6 +3,7 @@ package calcium import ( "context" "fmt" + "strings" "time" log "github.com/Sirupsen/logrus" @@ -11,7 +12,7 @@ import ( "gitlab.ricebook.net/platform/core/utils" ) -func (c *calcium) ReAllocResource(ids []string, cpu float64, mem int64) (chan *types.ReAllocResourceMessage, error) { +func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error) { containers, err := c.store.GetContainers(ids) if err != nil { return nil, err @@ -22,22 +23,22 @@ func (c *calcium) ReAllocResource(ids []string, cpu float64, mem int64) (chan *t meta[container.Podname][container.Nodename] = append(meta[container.Podname][container.Nodename], container) } - ch := make(chan *types.ReAllocResourceMessage) + ch := make(chan *types.ReallocResourceMessage) for podname, containers := range meta { pod, err := c.store.GetPod(podname) if err != nil { return ch, err } - if pod.Scheduler == CPU_SCHEDULER { - go c.updateContainersWithCPUPrior(ch, podname, containers, cpu, mem) + if pod.Scheduler == CPU_PRIOR { + go c.reallocContainersWithCPUPrior(ch, podname, containers, cpu, mem) continue } - go c.updateContainerWithMemoryPrior(ch, podname, containers, cpu, mem) + go c.reallocContainerWithMemoryPrior(ch, podname, containers, cpu, mem) } return ch, nil } -func (c *calcium) checkMemoryResource(podname, nodename string, mem int64, count int) (*types.Node, error) { +func (c *calcium) reallocNodeMemory(podname, nodename string, mem int64, count int) (*types.Node, error) { lock, err := c.Lock(podname, 30) if err != nil { return nil, err @@ -59,68 +60,209 @@ func (c *calcium) checkMemoryResource(podname, nodename string, mem int64, count return node, nil } -func (c *calcium) updateContainerWithMemoryPrior( - ch chan *types.ReAllocResourceMessage, +func (c *calcium) reallocContainerWithMemoryPrior( + ch chan *types.ReallocResourceMessage, podname string, - containers map[string][]*types.Container, cpu float64, mem int64) { + nodeContainers map[string][]*types.Container, cpu float64, mem int64) { - for nodename, containerList := range containers { - node, err := c.checkMemoryResource(podname, nodename, mem, len(containerList)) + for nodename, containers := range nodeContainers { + node, err := c.reallocNodeMemory(podname, nodename, mem, len(containers)) if err != nil { log.Errorf("[realloc] get node failed %v", err) continue } + go c.doUpdateContainerWithMemoryPrior(ch, podname, node, containers, cpu, mem) + } +} - for _, container := range containerList { - containerJSON, err := container.Inspect() - if err != nil { - log.Errorf("[realloc] get container failed %v", err) - ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false} - continue +func (c *calcium) doUpdateContainerWithMemoryPrior( + ch chan *types.ReallocResourceMessage, + podname string, + node *types.Node, + containers []*types.Container, + cpu float64, mem int64) { + + for _, container := range containers { + containerJSON, err := container.Inspect() + if err != nil { + log.Errorf("[realloc] get container failed %v", err) + ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false} + continue + } + + cpuQuota := int64(cpu * float64(utils.CpuPeriodBase)) + newCPUQuota := containerJSON.HostConfig.CPUQuota + cpuQuota + newMemory := containerJSON.HostConfig.Memory + mem + if newCPUQuota <= 0 || newMemory <= 0 { + log.Warnf("[relloc] new resource invaild %s, %d, %d", containerJSON.ID, newCPUQuota, newMemory) + ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false} + continue + } + + // TODO config timeout + newResource := enginecontainer.Resources{ + Memory: newMemory, + MemorySwap: newMemory, + CPUPeriod: utils.CpuPeriodBase, + CPUQuota: newCPUQuota, + } + updateConfig := enginecontainer.UpdateConfig{Resources: newResource} + if err := reSetContainer(containerJSON.ID, node, updateConfig, 10*time.Second); err != nil { + log.Errorf("[realloc] update container failed %v, %s", err, containerJSON.ID) + ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false} + // 如果是增加内存,失败的时候应该把内存还回去 + if mem > 0 { + if err := c.store.UpdateNodeMem(podname, node.Name, mem, "+"); err != nil { + log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID) + } } - cpuQuota := int64(cpu * float64(utils.CpuPeriodBase)) - newCPUQuota := containerJSON.HostConfig.CPUQuota + cpuQuota - newMemory := containerJSON.HostConfig.Memory + mem - if newCPUQuota <= 0 || newMemory <= 0 { - log.Warnf("[relloc] new resource invaild %s, %d, %d", containerJSON.ID, newCPUQuota, newMemory) - ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false} - continue + continue + } + // 如果是要降低内存,当执行成功的时候需要把内存还回去 + if mem < 0 { + if err := c.store.UpdateNodeMem(podname, node.Name, -mem, "+"); err != nil { + log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID) } + } + ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: true} + } +} - // TODO Async - // TODO config timeout - newResource := enginecontainer.Resources{ - Memory: newMemory, - MemorySwap: newMemory, - CPUPeriod: utils.CpuPeriodBase, - CPUQuota: newCPUQuota, - } - updateConfig := enginecontainer.UpdateConfig{Resources: newResource} - if err := reSetContainer(containerJSON.ID, node, updateConfig, 10*time.Second); err != nil { - log.Errorf("[realloc] update container failed %v, %s", err, containerJSON.ID) - ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: false} - // 如果是增加内存,失败的时候应该把内存还回去 - if mem > 0 { - if err := c.store.UpdateNodeMem(podname, nodename, mem, "+"); err != nil { - log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID) - } +const CPUSHARE_BASE = 10 + +func calculateCPUUsage(container *types.Container) float64 { + var full, fragment float64 + for _, usage := range container.CPU { + if usage == CPUSHARE_BASE { + full += 1.0 + continue + } + fragment += float64(usage) + } + return full + fragment/float64(CPUSHARE_BASE) +} + +func (c *calcium) reallocNodesCPU(nodesInfoMap map[float64]map[string][]*types.Container, podname string) (map[float64]map[string][]types.CPUMap, error) { + lock, err := c.Lock(podname, 30) + if err != nil { + return nil, err + } + defer lock.Unlock() + + // TODO too slow + nodesCPUMap := map[float64]map[string][]types.CPUMap{} + for cpu, nodesInfo := range nodesInfoMap { + for nodename, containers := range nodesInfo { + for _, container := range containers { + // 把 CPU 还回去,变成新的可用资源 + // 即便有并发操作,不影响 Create 操作 + // 最坏情况就是 CPU 重叠了,可以外部纠正 + if err := c.store.UpdateNodeCPU(podname, nodename, container.CPU, "+"); err != nil { + return nil, err } } - // 如果是要降低内存,当执行成功的时候需要把内存还回去 - if mem < 0 { - if err := c.store.UpdateNodeMem(podname, nodename, -mem, "+"); err != nil { - log.Errorf("[realloc] failed to set mem back %s", containerJSON.ID) + node, err := c.GetNode(podname, nodename) + if err != nil { + return nil, err + } + containersNum := len(containers) + nodesInfo := []types.NodeInfo{ + types.NodeInfo{ + CPUAndMem: types.CPUAndMem{ + CpuMap: node.CPU, + MemCap: 0, + }, + Name: nodename, + }, + } + result, changed, err := c.scheduler.SelectCPUNodes(nodesInfo, cpu, containersNum) + if err != nil { + return nil, err + } + nodeCPUMap, isChanged := changed[nodename] + containersCPUMap, hasResult := result[nodename] + if isChanged && hasResult { + node.CPU = nodeCPUMap + if err := c.store.UpdateNode(node); err != nil { + return nil, err } + nodesCPUMap[cpu][nodename] = containersCPUMap } - ch <- &types.ReAllocResourceMessage{ContainerID: containerJSON.ID, Success: true} } } + return nodesCPUMap, nil } -func (c *calcium) updateContainersWithCPUPrior( - ch chan *types.ReAllocResourceMessage, +// mem not used in this prior +func (c *calcium) reallocContainersWithCPUPrior( + ch chan *types.ReallocResourceMessage, podname string, - containers map[string][]*types.Container, cpu float64, mem int64) { + nodeContainers map[string][]*types.Container, cpu float64, mem int64) { + + nodesInfoMap := map[float64]map[string][]*types.Container{} + for nodename, containers := range nodeContainers { + for _, container := range containers { + newCPU := calculateCPUUsage(container) + cpu + if newCPU <= 0.0 { + log.Warnf("[realloc] cpu set below zero") + continue + } + if _, ok := nodesInfoMap[newCPU]; !ok { + nodesInfoMap[newCPU] = map[string][]*types.Container{} + } + nodesInfoMap[newCPU][nodename] = append(nodesInfoMap[newCPU][nodename], container) + } + } + nodesCPUMap, err := c.reallocNodesCPU(nodesInfoMap, podname) + if err != nil { + log.Errorf("[realloc] realloc cpu res failed %v", err) + return + } + for cpu, nodesCPUResult := range nodesCPUMap { + go c.doReallocContainersWithCPUPrior(ch, podname, nodesCPUResult, nodesInfoMap[cpu]) + } +} + +func (c *calcium) doReallocContainersWithCPUPrior( + ch chan *types.ReallocResourceMessage, + podname string, + nodesCPUResult map[string][]types.CPUMap, + nodesInfoMap map[string][]*types.Container, +) { + for nodename, cpuset := range nodesCPUResult { + node, err := c.GetNode(podname, nodename) + if err != nil { + return + } + containers := nodesInfoMap[nodename] + for index, container := range containers { + // TODO dante 来重构吧,应该能抽出一个公共函数 + // TODO 同 create container #543 + // TODO 抽出 CPUShare 的配置 + var shareQuota int64 = 10 + labels := []string{} + for label, share := range cpuset[index] { + labels = append(labels, label) + if share < shareQuota { + shareQuota = share + } + } + cpuShares := int64(float64(shareQuota) / float64(10) * float64(1024)) + cpuSetCpus := strings.Join(labels, ",") + resource := enginecontainer.Resources{ + CPUShares: cpuShares, + CpusetCpus: cpuSetCpus, + } + + updateConfig := enginecontainer.UpdateConfig{Resources: resource} + if err := reSetContainer(container.ID, node, updateConfig, 10*time.Second); err != nil { + log.Errorf("[realloc] update container failed %v", err) + // TODO 这里理论上是可以恢复 CPU 占用表的,一来我们知道新的占用是怎样,二来我们也晓得老的占用是啥样 + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + } + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} + } + } } func reSetContainer(ID string, node *types.Node, config enginecontainer.UpdateConfig, timeout time.Duration) error { diff --git a/types/message.go b/types/message.go index 61f3e5c8d..b56412a15 100644 --- a/types/message.go +++ b/types/message.go @@ -52,7 +52,7 @@ type PullImageMessage struct { BuildImageMessage } -type ReAllocResourceMessage struct { +type ReallocResourceMessage struct { ContainerID string Success bool }