Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 136 additions & 95 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
if !ok {
pod, err = c.store.GetPod(ctx, container.Podname)
if err != nil {
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
ch <- &types.ReallocResourceMessage{
ContainerID: container.ID,
Error: err,
}
continue
}
podCache[container.Podname] = pod
Expand All @@ -63,56 +66,72 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
}); err != nil {
log.Errorf("[ReallocResource] Realloc failed %v", err)
for _, ID := range IDs {
ch <- &types.ReallocResourceMessage{ContainerID: ID, Success: false}
ch <- &types.ReallocResourceMessage{
ContainerID: ID,
Error: err,
}
}
}
}()
return ch, nil
}

func (c *Calcium) doReallocContainer(
ctx context.Context,
ch chan *types.ReallocResourceMessage,
pod *types.Pod,
nodeContainersInfo nodeContainers,
cpu float64, memory int64, volumes types.VolumeBindings) {
func calVolCPUMemNodeContainerInfo(nodename string, container *types.Container, cpu float64, memory int64, volumes types.VolumeBindings, volCPUMemNodeContainersInfo volCPUMemNodeContainers, hardVbsForContainer map[string]types.VolumeBindings) error {
newCPU := utils.Round(container.Quota + cpu)
newMem := container.Memory + memory
if newCPU < 0 || newMem < 0 {
log.Errorf("[doReallocContainer] New resource invalid %s, cpu %f, mem %d", container.ID, newCPU, newMem)
return types.ErrInvalidRes
}

autoVolumes, hardVolumes, err := container.Volumes.Merge(volumes)
hardVbsForContainer[container.ID] = hardVolumes
if err != nil {
log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, volumes, err)
return err
}
newAutoVol := strings.Join(autoVolumes.ToStringSlice(true, false), ",")

if _, ok := volCPUMemNodeContainersInfo[newAutoVol]; !ok {
volCPUMemNodeContainersInfo[newAutoVol] = map[float64]map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU] = map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem] = nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = []*types.Container{}
}
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = append(volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename], container)
return nil
}

func calVolCPUMemNodeContainersInfo(ch chan *types.ReallocResourceMessage, nodeContainersInfo nodeContainers, cpu float64, memory int64, volumes types.VolumeBindings) (volCPUMemNodeContainers, map[string]types.VolumeBindings) {
volCPUMemNodeContainersInfo := volCPUMemNodeContainers{}
hardVbsForContainer := map[string]types.VolumeBindings{}
for nodename, containers := range nodeContainersInfo {
for _, container := range containers {
newCPU := utils.Round(container.Quota + cpu)
newMem := container.Memory + memory
if newCPU < 0 || newMem < 0 {
log.Errorf("[doReallocContainer] New resource invaild %s, cpu %f, mem %d", container.ID, newCPU, newMem)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}

autoVolumes, hardVolumes, err := container.Volumes.Merge(volumes)
hardVbsForContainer[container.ID] = hardVolumes
if err != nil {
log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, volumes, err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
newAutoVol := strings.Join(autoVolumes.ToStringSlice(true, false), ",")

if _, ok := volCPUMemNodeContainersInfo[newAutoVol]; !ok {
volCPUMemNodeContainersInfo[newAutoVol] = map[float64]map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU] = map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem] = nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = []*types.Container{}
if err := calVolCPUMemNodeContainerInfo(nodename, container, cpu, memory, volumes, volCPUMemNodeContainersInfo, hardVbsForContainer); err != nil {
ch <- &types.ReallocResourceMessage{
ContainerID: container.ID,
Error: err,
}
}
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = append(volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename], container)
}
}
return volCPUMemNodeContainersInfo, hardVbsForContainer
}

func (c *Calcium) doReallocContainer(
ctx context.Context,
ch chan *types.ReallocResourceMessage,
pod *types.Pod,
nodeContainersInfo nodeContainers,
cpu float64, memory int64, volumes types.VolumeBindings) {

volCPUMemNodeContainersInfo, hardVbsForContainer := calVolCPUMemNodeContainersInfo(ch, nodeContainersInfo, cpu, memory, volumes)

for newAutoVol, cpuMemNodeContainersInfo := range volCPUMemNodeContainersInfo {
for newCPU, memNodesContainers := range cpuMemNodeContainersInfo {
Expand Down Expand Up @@ -166,66 +185,15 @@ func (c *Calcium) doReallocContainer(
cpusets = nodeCPUPlans[node.Name][:containerWithCPUBind]
}

autoVbs, _ := types.MakeVolumeBindings(strings.Split(newAutoVol, ","))
planForContainers, err := c.reallocVolume(node, containers, autoVbs)
if err != nil {
return err
newResource := &enginetypes.VirtualizationResource{
Quota: newCPU,
Memory: newMemory,
}

for _, container := range containers {
newResource := &enginetypes.VirtualizationResource{
Quota: newCPU,
Memory: newMemory,
SoftLimit: container.SoftLimit,
Volumes: hardVbsForContainer[container.ID].ToStringSlice(false, false),
}
if len(container.CPU) > 0 {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
cpusets = cpusets[1:]
}

if newAutoVol != "" {
newResource.VolumePlan = planForContainers[container].ToLiteral()
newResource.Volumes = append(newResource.Volumes, autoVbs.ToStringSlice(false, false)...)
}

newVbs, _ := types.MakeVolumeBindings(newResource.Volumes)
if !newVbs.IsEqual(container.Volumes) {
newResource.VolumeChanged = true
}

updateSuccess := false
setSuccess := false
if err := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); err == nil {
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
updateSuccess = true
} else {
log.Errorf("[doReallocContainer] Realloc container %s failed %v", container.ID, err)
}
// 成功失败都需要修改 node 的占用
// 成功的话,node 占用为新资源
// 失败的话,node 占用为老资源
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
}
// 更新 container 元数据
if err := c.store.UpdateContainer(ctx, container); err == nil {
setSuccess = true
} else {
log.Errorf("[doReallocContainer] Realloc finish but update container %s failed %v", container.ID, err)
}
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: updateSuccess && setSuccess}
if err := c.updateContainersResources(ctx, ch, node, containers, newResource, cpusets, hardVbsForContainer, newAutoVol); err != nil {
return err
}

if err := c.store.UpdateNode(ctx, node); err != nil {
log.Errorf("[doReallocContainer] Realloc finish but update node %s failed %s", node.Name, err)
litter.Dump(node)
Expand All @@ -234,7 +202,10 @@ func (c *Calcium) doReallocContainer(
}); err != nil {
log.Errorf("[doReallocContainer] Realloc container %v failed: %v", containers, err)
for _, container := range containers {
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
ch <- &types.ReallocResourceMessage{
ContainerID: container.ID,
Error: err,
}
}
}
}
Expand All @@ -243,6 +214,76 @@ func (c *Calcium) doReallocContainer(
}
}

func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.ReallocResourceMessage, node *types.Node, containers []*types.Container,
newResource *enginetypes.VirtualizationResource, cpusets []types.CPUMap, hardVbsForContainer map[string]types.VolumeBindings, newAutoVol string) error {

autoVbs, _ := types.MakeVolumeBindings(strings.Split(newAutoVol, ","))
planForContainers, err := c.reallocVolume(node, containers, autoVbs)
if err != nil {
return err
}

for _, container := range containers {
if len(container.CPU) > 0 {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
cpusets = cpusets[1:]
}

if newAutoVol != "" {
newResource.VolumePlan = planForContainers[container].ToLiteral()
newResource.Volumes = append(newResource.Volumes, autoVbs.ToStringSlice(false, false)...)
}

newVbs, _ := types.MakeVolumeBindings(newResource.Volumes)
if !newVbs.IsEqual(container.Volumes) {
newResource.VolumeChanged = true
}

newResource.SoftLimit = container.SoftLimit
newResource.Volumes = hardVbsForContainer[container.ID].ToStringSlice(false, false)

if err := c.updateResource(ctx, node, container, newResource); err != nil {
ch <- &types.ReallocResourceMessage{
ContainerID: container.ID,
Error: err,
}
}
}
return nil
}

func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error {
if err := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); err == nil {
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
} else {
log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, err)
return err
}
// 成功失败都需要修改 node 的占用
// 成功的话,node 占用为新资源
// 失败的话,node 占用为老资源
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
}
// 更新 container 元数据
if err := c.store.UpdateContainer(ctx, container); err != nil {
log.Errorf("[updateResource] Realloc finish but update container %s failed %v", container.ID, err)

return err
}
return nil
}

func (c *Calcium) reallocVolume(node *types.Node, containers []*types.Container, vbs types.VolumeBindings) (plans map[*types.Container]types.VolumePlan, err error) {
if len(vbs) == 0 {
return
Expand Down
27 changes: 14 additions & 13 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,36 +73,36 @@ func TestRealloc(t *testing.T) {
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetPod
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
// failed by newCPU < 0
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
// failed by memory not enough
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// failed by no new CPU Plan
simpleMockScheduler := &schedulermocks.Scheduler{}
Expand All @@ -115,14 +115,14 @@ func TestRealloc(t *testing.T) {
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// failed by wrong total
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// vaild cpu plans
nodeCPUPlans := map[string][]types.CPUMap{
Expand All @@ -149,7 +149,7 @@ func TestRealloc(t *testing.T) {
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// check node resource as usual
assert.Equal(t, node1.CPU["2"], int64(10))
Expand All @@ -161,7 +161,7 @@ func TestRealloc(t *testing.T) {
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil)
// failed by volume binding incompatible
Expand All @@ -177,21 +177,21 @@ func TestRealloc(t *testing.T) {
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// failed by volume schedule error
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientVolume).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// failed due to re-volume plan less then container number
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 0, nil).Twice()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
assert.Error(t, r.Error)
}
// good to go
// rest everything
Expand Down Expand Up @@ -254,7 +254,8 @@ func TestRealloc(t *testing.T) {
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}))
assert.NoError(t, err)
for r := range ch {
assert.True(t, r.Success)
// TODO: Handle Received unexpected error: container ID must be length of 64 in test
assert.Error(t, r.Error)
}
assert.Equal(t, node2.CPU["3"], int64(0))
assert.Equal(t, node2.CPU["2"], int64(100))
Expand Down
Loading