Skip to content

Commit

Permalink
use haxmap to speedup
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Feb 3, 2023
1 parent 2398916 commit ea2d4ac
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 19 deletions.
6 changes: 3 additions & 3 deletions resource3/plugins/cpumem/calculate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestCalculateDeploy(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100, 0)
node := nodes[0]

// invalid opts
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestCalculateDeploy(t *testing.T) {
func TestCalculateRealloc(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100, 0)
node := nodes[0]

// numa node
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestCalculateRealloc(t *testing.T) {
func TestCalculateRemap(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 1, 4, 4*units.GB, 100)
nodes := generateNodes(ctx, t, cm, 1, 4, 4*units.GB, 100, 0)
node := nodes[0]

resource := &plugintypes.NodeResource{
Expand Down
1 change: 1 addition & 0 deletions resource3/plugins/cpumem/cpumem.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
name = "cpumem"
rate = 8
nodeResourceInfoKey = "/resource/cpumem/%s"
priority = 100
)

// Plugin
Expand Down
8 changes: 4 additions & 4 deletions resource3/plugins/cpumem/cpumem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func initCPUMEM(ctx context.Context, t *testing.T) *Plugin {

func generateNodes(
ctx context.Context, t *testing.T, cm *Plugin,
nums int, cores int, memory int64, shares int,
nums int, cores int, memory int64, shares, index int,
) []string {
reqs := generateNodeResourceRequests(t, nums, cores, memory, shares)
reqs := generateNodeResourceRequests(t, nums, cores, memory, shares, index)
info := &enginetypes.Info{NCPU: 8, MemTotal: 2048}
names := []string{}
for name, req := range reqs {
Expand All @@ -52,9 +52,9 @@ func generateNodes(
return names
}

func generateNodeResourceRequests(t *testing.T, nums int, cores int, memory int64, shares int) map[string]*plugintypes.NodeResourceRequest {
func generateNodeResourceRequests(t *testing.T, nums int, cores int, memory int64, shares, index int) map[string]*plugintypes.NodeResourceRequest {
infos := map[string]*plugintypes.NodeResourceRequest{}
for i := 0; i < nums; i++ {
for i := index; i < index+nums; i++ {
info := &plugintypes.NodeResourceRequest{
"cpu": cores,
"share": shares,
Expand Down
55 changes: 43 additions & 12 deletions resource3/plugins/cpumem/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strconv"

"github.com/alphadose/haxmap"
"github.com/cockroachdb/errors"
"github.com/mitchellh/mapstructure"
enginetypes "github.com/projecteru2/core/engine/types"
Expand All @@ -19,8 +20,6 @@ import (
"github.com/sanity-io/litter"
)

const priority = 100

// AddNode .
func (p Plugin) AddNode(ctx context.Context, nodename string, resource *plugintypes.NodeResourceRequest, info *enginetypes.Info) (*plugintypes.AddNodeResponse, error) {
// try to get the node resource
Expand Down Expand Up @@ -64,7 +63,7 @@ func (p Plugin) AddNode(ctx context.Context, nodename string, resource *pluginty

// if NUMA is set but NUMAMemory is not set
// then divide memory equally according to the number of numa nodes
if len(req.NUMA) > 0 && req.NUMAMemory == nil {
if len(req.NUMA) > 0 && (req.NUMAMemory == nil || len(req.NUMAMemory) == 0) {
averageMemory := req.Memory / int64(len(req.NUMA))
nodeResourceInfo.Capacity.NUMAMemory = cpumemtypes.NUMAMemory{}
for _, ID := range req.NUMA {
Expand Down Expand Up @@ -106,12 +105,13 @@ func (p Plugin) GetNodesDeployCapacity(ctx context.Context, nodenames []string,

nodesDeployCapacityMap := map[string]*plugintypes.NodeDeployCapacity{}
total := 0
for _, nodename := range nodenames {
nodeResourceInfo, err := p.doGetNodeResourceInfo(ctx, nodename)
if err != nil {
logger.WithField("node", nodename).Error(ctx, err)
return nil, err
}

nodesResourceInfos, err := p.doGetNodesResourceInfo(ctx, nodenames)
if err != nil {
return nil, err
}

for nodename, nodeResourceInfo := range nodesResourceInfos {
nodeDeployCapacity := p.doGetNodeDeployCapacity(nodeResourceInfo, req)
if nodeDeployCapacity.Capacity > 0 {
nodesDeployCapacityMap[nodename] = nodeDeployCapacity
Expand Down Expand Up @@ -324,12 +324,43 @@ func (p Plugin) getNodeResourceInfo(ctx context.Context, nodename string, worklo
}

func (p Plugin) doGetNodeResourceInfo(ctx context.Context, nodename string) (*cpumemtypes.NodeResourceInfo, error) {
resourceInfo := &cpumemtypes.NodeResourceInfo{}
resp, err := p.store.GetOne(ctx, fmt.Sprintf(nodeResourceInfoKey, nodename))
resp, err := p.doGetNodesResourceInfo(ctx, []string{nodename})
if err != nil {
return nil, err
}
return resourceInfo, json.Unmarshal(resp.Value, resourceInfo)
return resp[nodename], err
}

func (p Plugin) doGetNodesResourceInfo(ctx context.Context, nodenames []string) (map[string]*cpumemtypes.NodeResourceInfo, error) {
keys := []string{}
for _, nodename := range nodenames {
keys = append(keys, fmt.Sprintf(nodeResourceInfoKey, nodename))
}
resps, err := p.store.GetMulti(ctx, keys)
if err != nil {
return nil, err
}

tmp := haxmap.New[string, *cpumemtypes.NodeResourceInfo]()
result := map[string]*cpumemtypes.NodeResourceInfo{}

for _, resp := range resps {
resp := resp
go func() {
r := &cpumemtypes.NodeResourceInfo{}
if err := json.Unmarshal(resp.Value, r); err != nil {
log.WithFunc("resource.cpumem.doGetNodesResourceInfo").Error(ctx, err)
return
}
nodename := utils.Tail(string(resp.Key))
tmp.Set(nodename, r)
}()
}
tmp.ForEach(func(k string, v *cpumemtypes.NodeResourceInfo) bool {
result[k] = v
return true
})
return result, nil
}

func (p Plugin) doSetNodeResourceInfo(ctx context.Context, nodename string, resourceInfo *cpumemtypes.NodeResourceInfo) error {
Expand Down
194 changes: 194 additions & 0 deletions resource3/plugins/cpumem/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package cpumem

import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/docker/go-units"
enginetypes "github.com/projecteru2/core/engine/types"
plugintypes "github.com/projecteru2/core/resource3/plugins/types"
coretypes "github.com/projecteru2/core/types"
"github.com/stretchr/testify/assert"
)

func TestAddNode(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100, 0)
node := nodes[0]
nodeForAdd := "test2"

req := &plugintypes.NodeResourceRequest{
"numa-cpu": []string{"0", "1"},
}

info := &enginetypes.Info{NCPU: 2, MemTotal: 4 * units.GB}

// existent node
_, err := cm.AddNode(ctx, node, req, info)
assert.Equal(t, err, coretypes.ErrNodeExists)

// normal case
r, err := cm.AddNode(ctx, nodeForAdd, req, info)
assert.Nil(t, err)
assert.Equal(t, (*r.Capacity)["memory"], int64(4*units.GB*rate/10))
}

func TestRemoveNode(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 1, 2, 4*units.GB, 100, 0)
node := nodes[0]
nodeForDel := "test2"

_, err := cm.RemoveNode(ctx, node)
assert.Nil(t, err)
_, err = cm.RemoveNode(ctx, nodeForDel)
assert.Nil(t, err)
}

func TestGetNodesDeployCapacityWithCPUBind(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 2, 2, 4*units.GB, 100, 0)

req := &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 0.5,
"memory-request": "1",
}

// non-existent node
_, err := cm.GetNodesDeployCapacity(ctx, []string{"xxx"}, req)
assert.True(t, errors.Is(err, coretypes.ErrInvaildCount))

// normal
r, err := cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.True(t, r.Total >= 1)

// more cpu
req = &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 2,
"memory-request": "1",
}
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.True(t, r.Total < 3)

// more
req = &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 3,
"memory-request": "1",
}
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.True(t, r.Total < 2)

// less
req = &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 1,
"memory-request": "1",
}
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.True(t, r.Total < 5)

// complex
nodes = generateNodes(ctx, t, cm, 1, 4, 12*units.GB, 100, 10)
nodes = append(nodes, generateNodes(ctx, t, cm, 1, 14, 12*units.GB, 100, 11)...)
nodes = append(nodes, generateNodes(ctx, t, cm, 1, 12, 12*units.GB, 100, 12)...)
nodes = append(nodes, generateNodes(ctx, t, cm, 1, 18, 12*units.GB, 100, 13)...)
nodes = append(nodes, generateNodes(ctx, t, cm, 1, 8, 12*units.GB, 100, 14)...)

req = &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 1.7,
"memory-request": "1",
}
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.Equal(t, r.Total, 28)
}

func TestGetNodesDeployCapacityWithMemory(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 2, 2, 1024, 100, 0)

req := &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 0.1,
"memory-request": "1024",
}

r, err := cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.Equal(t, r.Total, 2)

(*req)["memory-request"] = "1025"
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.Equal(t, r.Total, 0)
}

func TestGetNodesDeployCapacityWithMaxShareLimit(t *testing.T) {
ctx := context.Background()
cm := initCPUMEM(ctx, t)
cm.config.Scheduler.MaxShare = 2
nodes := generateNodes(ctx, t, cm, 1, 6, 12*units.GB, 100, 0)
node := nodes[0]

req := &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 1.7,
"memory-request": "1",
}

r, err := cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.Equal(t, r.Total, 2)

// numa node
resource := &plugintypes.NodeResource{
"cpu": 4.0,
"cpu_map": map[string]int64{
"0": 0,
"1": 0,
"2": 100,
"3": 100,
},
"memory": 12 * units.GB,
}

_, err = cm.SetNodeResourceCapacity(ctx, node, resource, nil, false, true)
assert.Nil(t, err)

(*req)["cpu-request"] = 1.2
r, err = cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(t, err)
assert.Equal(t, r.Total, 1)
}

func BenchmarkGetNodesCapacity(b *testing.B) {
b.StopTimer()
t := &testing.T{}
ctx := context.Background()
cm := initCPUMEM(ctx, t)
nodes := generateNodes(ctx, t, cm, 10000, 24, 128*units.GB, 100, 0)
req := &plugintypes.WorkloadResourceRequest{
"cpu-bind": true,
"cpu-request": 1.3,
"memory-request": "1",
}
b.StartTimer()

for i := 0; i < b.N; i++ {
_, err := cm.GetNodesDeployCapacity(ctx, nodes, req)
assert.Nil(b, err)
}
}

0 comments on commit ea2d4ac

Please sign in to comment.