Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ deps:
env GO111MODULE=on go mod vendor

binary:
CGO_ENABLED=0 go build -ldflags "$(GO_LDFLAGS)" -o eru-core
CGO_ENABLED=0 go build -ldflags "$(GO_LDFLAGS)" -gcflags=all=-G=3 -o eru-core

build: deps binary

Expand All @@ -31,7 +31,6 @@ test: deps unit-test
mock: deps
mockery --dir vendor/google.golang.org/grpc --output 3rdmocks --name ServerStream
mockery --dir vendor/github.com/docker/docker/client --output engine/docker/mocks --name APIClient
mockery --dir scheduler --output scheduler/mocks --name Scheduler
mockery --dir source --output source/mocks --name Source
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
Expand All @@ -41,6 +40,7 @@ mock: deps
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
mockery --dir rpc/gen/ --output rpc/mocks --name CoreRPC_RunAndWaitServer
mockery --dir resources --output resources/mocks --name Plugin

.ONESHELL:

Expand All @@ -49,22 +49,21 @@ cloc:

unit-test:
go vet `go list ./... | grep -v '/vendor/' | grep -v '/tools'` && \
go test -race -timeout 240s -count=1 -cover ./utils/... \
go test -race -timeout 240s -count=1 -vet=off -cover ./utils/... \
./types/... \
./store/etcdv3/. \
./store/etcdv3/embedded/. \
./store/etcdv3/meta/. \
./source/common/... \
./strategy/... \
./scheduler/complex/... \
./rpc/. \
./lock/etcdlock/... \
./auth/simple/... \
./discovery/helium... \
./resources/types/. \
./resources/storage/... \
./resources/volume/... \
./resources/cpumem/... \
./resources/cpumem/models/... \
./resources/cpumem/schedule/... \
./resources/volume/models/... \
./resources/volume/schedule/... \
./wal/. \
./wal/kv/. \
./store/redis/... \
Expand Down
18 changes: 16 additions & 2 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,22 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
// get idle max node
node, err := c.scheduler.MaxIdleNode(nodes)
return node, err
return c.getMostIdleNode(ctx, nodes)
}

func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*types.Node, error) {
nodeNames := []string{}
nodeMap := map[string]*types.Node{}
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
nodeMap[node.Name] = node
}

mostIdleNode, err := c.resource.GetMostIdleNode(ctx, nodeNames)
if err != nil {
return nil, err
}
return nodeMap[mostIdleNode], nil
}

func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, opts *types.BuildOptions) ([]string, io.ReadCloser, error) {
Expand Down
18 changes: 12 additions & 6 deletions cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
"github.com/projecteru2/core/resources"
resourcemocks "github.com/projecteru2/core/resources/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"

Expand Down Expand Up @@ -82,13 +83,18 @@ func TestBuild(t *testing.T) {
Engine: engine,
}
store.On("GetNodesByPod", mock.AnythingOfType("*context.emptyCtx"), mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
scheduler := &schedulermocks.Scheduler{}
c.scheduler = scheduler
// failed by MaxIdleNode
scheduler.On("MaxIdleNode", mock.AnythingOfType("[]*types.Node")).Return(nil, types.ErrBadMeta).Once()

plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)

// failed by plugin error
plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(nil, types.ErrGetMostIdleNodeFailed).Once()
ch, err = c.BuildImage(ctx, opts)
assert.Error(t, err)
scheduler.On("MaxIdleNode", mock.AnythingOfType("[]*types.Node")).Return(node, nil)

plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(&resources.GetMostIdleNodeResponse{NodeName: node.Name, Priority: 100}, nil)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
// create image
c.config.Docker.Hub = "test.com"
c.config.Docker.Namespace = "test"
Expand Down
41 changes: 30 additions & 11 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"github.com/projecteru2/core/discovery"
"github.com/projecteru2/core/discovery/helium"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/scheduler"
complexscheduler "github.com/projecteru2/core/scheduler/complex"
"github.com/projecteru2/core/resources"
"github.com/projecteru2/core/resources/cpumem"
"github.com/projecteru2/core/resources/volume"
"github.com/projecteru2/core/source"
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
Expand All @@ -24,7 +25,7 @@ import (
type Calcium struct {
config types.Config
store store.Store
scheduler scheduler.Scheduler
resource *resources.PluginManager
source source.Source
watcher discovery.Service
wal *WAL
Expand All @@ -41,13 +42,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}

// set scheduler
potassium, err := complexscheduler.New(config)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
scheduler.InitSchedulerV1(potassium)

// set scm
var scm source.Source
scmtype := strings.ToLower(config.Git.SCMType)
Expand All @@ -67,10 +61,35 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
// set watcher
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
// set resource plugin manager
resource, err := resources.NewPluginManager(config)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}

// load cpumem plugin
cpumem, err := cpumem.NewPlugin(config)
if err != nil {
log.Errorf(context.TODO(), "[NewPluginManager] new cpumem plugin error: %v", err)
return nil, err
}
resource.AddPlugins(cpumem)

// load volume plugin
volume, err := volume.NewPlugin(config)
if err != nil {
log.Errorf(context.TODO(), "[NewPluginManager] new volume plugin error: %v", err)
return nil, err
}
resource.AddPlugins(volume)

cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, resource: resource}

cal.wal, err = newWAL(config, cal)
cal.identifier = config.Identifier()

go cal.InitMetrics()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
36 changes: 13 additions & 23 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,21 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

schedulermocks "github.com/projecteru2/core/scheduler/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/projecteru2/core/resources"
resourcemocks "github.com/projecteru2/core/resources/mocks"
sourcemocks "github.com/projecteru2/core/source/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

// DummyLock replace lock for testing
type dummyLock struct {
m sync.Mutex
}

// Lock for lock
func (d *dummyLock) Lock(ctx context.Context) (context.Context, error) {
d.m.Lock()
return context.Background(), nil
}

// Unlock for unlock
func (d *dummyLock) Unlock(ctx context.Context) error {
d.m.Unlock()
return nil
}

func NewTestCluster() *Calcium {
walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*")
if err != nil {
Expand All @@ -61,14 +44,21 @@ func NewTestCluster() *Calcium {
HAKeepaliveInterval: 16 * time.Second,
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
c.source = &sourcemocks.Source{}
c.wal = &WAL{WAL: &walmocks.WAL{}}

mwal := c.wal.WAL.(*walmocks.WAL)
commit := wal.Commit(func() error { return nil })
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)

plugin := &resourcemocks.Plugin{}
plugin.On("Name").Return("mock-plugin")
plugin.On("ResolveNodeResourceInfoToMetrics", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded)
if c.resource, err = resources.NewPluginManager(c.config); err != nil {
panic(err)
}
c.resource.AddPlugins(plugin)

return c
}

Expand Down
55 changes: 15 additions & 40 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resources"
resourcetypes "github.com/projecteru2/core/resources/types"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"

Expand All @@ -22,58 +21,34 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
}

return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
nodes := []string{}
for node := range nodeMap {
nodes = append(nodes, node)
}

if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
if msg.NodeCapacities, err = c.doGetDeployMap(ctx, nodes, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
return err
}

for _, capacity := range msg.NodeCapacities {
msg.Total += capacity
}
} else {
var infos []strategy.Info
msg.Total, _, infos, err = c.doCalculateCapacity(ctx, nodeMap, opts)
var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
if err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doCalculateCapacity failed: %+v", err)
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
return err
}
for _, info := range infos {
msg.NodeCapacities[info.Nodename] = info.Capacity
if msg.Total <= 0 {
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
}
for node, info := range infos {
msg.NodeCapacities[node] = info.Capacity
}
}
return nil
})
}

func (c *Calcium) doCalculateCapacity(ctx context.Context, nodeMap map[string]*types.Node, opts *types.DeployOptions) (
total int,
plans []resourcetypes.ResourcePlans,
infos []strategy.Info,
err error,
) {
if len(nodeMap) == 0 {
return 0, nil, nil, errors.WithStack(types.ErrInsufficientNodes)
}

resourceRequests, err := resources.MakeRequests(opts.ResourceOpts)
if err != nil {
return 0, nil, nil, err
}

// select available nodes
if plans, err = resources.SelectNodesByResourceRequests(ctx, resourceRequests, nodeMap); err != nil {
return 0, nil, nil, err
}

// deploy strategy
infos = strategy.NewInfos(resourceRequests, nodeMap, plans)
for _, info := range infos {
total += info.Capacity
}
log.Debugf(ctx, "[Calcium.doCalculateCapacity] plans: %+v, total: %v", plans, total)
if total <= 0 {
return 0, nil, nil, errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
}
return
}
Loading