Skip to content

Commit

Permalink
refactor label filter
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Sep 3, 2018
1 parent f61ff44 commit a1459d9
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 361 deletions.
31 changes: 2 additions & 29 deletions cluster/calcium/meta.go
Expand Up @@ -7,7 +7,6 @@ import (
"context"

"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -59,24 +58,7 @@ func (c *Calcium) ListPodNodes(ctx context.Context, podname string, all bool) ([

// ListContainers list containers
func (c *Calcium) ListContainers(ctx context.Context, opts *types.ListContainersOptions) ([]*types.Container, error) {
cs, err := c.store.ListContainers(ctx, opts.Appname, opts.Entrypoint, opts.Nodename)
if err != nil {
log.Debugf("[ListContainers] Error during get containers %v", err)
return cs, err
}
result := []*types.Container{}
for i, c := range cs {
cs[i].RawInspect, err = c.Inspect(ctx)
if err != nil {
log.Errorf("[ListContainers] Error during inspect container %s %v", c.ID, err)
continue
}
if !utils.FilterContainer(cs[i].RawInspect.Config.Labels, opts.Labels) {
continue
}
result = append(result, cs[i])
}
return result, nil
return c.store.ListContainers(ctx, opts.Appname, opts.Entrypoint, opts.Nodename)
}

// ListNodeContainers list containers belong to one node
Expand All @@ -96,16 +78,7 @@ func (c *Calcium) GetNode(ctx context.Context, podname, nodename string) (*types

// GetContainer get a container
func (c *Calcium) GetContainer(ctx context.Context, ID string) (*types.Container, error) {
container, err := c.store.GetContainer(ctx, ID)
if err != nil {
return nil, err
}
rawInspect, err := container.Inspect(ctx)
if err != nil {
return nil, err
}
container.RawInspect = rawInspect
return container, nil
return c.store.GetContainer(ctx, ID)
}

// GetContainers get containers
Expand Down
31 changes: 18 additions & 13 deletions cluster/calcium/replace_container.go
Expand Up @@ -7,13 +7,14 @@ import (

enginetypes "github.com/docker/docker/api/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

// ReplaceContainer replace containers with same resource
func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.DeployOptions, replaceOpts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error) {
func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error) {
oldContainers, err := c.ListContainers(ctx, &types.ListContainersOptions{
Appname: opts.Name, Entrypoint: opts.Entrypoint.Name, Nodename: opts.Nodename, Labels: replaceOpts.FilterLabels,
Appname: opts.Name, Entrypoint: opts.Entrypoint.Name, Nodename: opts.Nodename,
})
if err != nil {
return nil, err
Expand All @@ -32,17 +33,21 @@ func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.DeployOption
log.Debugf("[ReplaceContainer] Skip not in pod container %s", oldContainer.ID)
continue
}
// 覆盖 podname 如果做全量更新的话
opts.Podname = oldContainer.Podname
log.Debugf("[ReplaceContainer] Replace old container %s", oldContainer.ID)
wg.Add(1)
go func(deployOpts types.DeployOptions, oldContainer *types.Container, index int) {
go func(replaceOpts types.ReplaceOptions, oldContainer *types.Container, index int) {
defer wg.Done()
// 使用复制之后的配置
// 停老的,起新的
deployOpts.Memory = oldContainer.Memory
deployOpts.CPUQuota = oldContainer.Quota
deployOpts.SoftLimit = oldContainer.SoftLimit
replaceOpts.Memory = oldContainer.Memory
replaceOpts.CPUQuota = oldContainer.Quota
replaceOpts.SoftLimit = oldContainer.SoftLimit

createMessage, removeMessage, err := c.doReplaceContainer(ctx, oldContainer, &deployOpts, ib, index, replaceOpts.Force)
createMessage, removeMessage, err := c.doReplaceContainer(
ctx, oldContainer, &replaceOpts, ib, index,
)
ch <- &types.ReplaceContainerMessage{
Create: createMessage,
Remove: removeMessage,
Expand Down Expand Up @@ -71,10 +76,9 @@ func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.DeployOption
func (c *Calcium) doReplaceContainer(
ctx context.Context,
container *types.Container,
opts *types.DeployOptions,
opts *types.ReplaceOptions,
ib *imageBucket,
index int,
force bool,
) (*types.CreateContainerMessage, *types.RemoveContainerMessage, error) {
removeMessage := &types.RemoveContainerMessage{
ContainerID: container.ID,
Expand All @@ -95,15 +99,16 @@ func (c *Calcium) doReplaceContainer(
return nil, removeMessage, err
}

// 覆盖 podname 如果做全量更新的话
opts.Podname = container.Podname
if !utils.FilterContainer(containerJSON.Config.Labels, opts.Labels) {
return nil, removeMessage, types.ErrNotFitLabels
}

// 记录镜像
if ib != nil {
ib.Add(container.Podname, containerJSON.Config.Image)
}

removeMessage.Message, err = c.doStopContainer(ctx, container, containerJSON, ib, force)
removeMessage.Message, err = c.doStopContainer(ctx, container, containerJSON, ib, opts.Force)
if err != nil {
return nil, removeMessage, err
}
Expand All @@ -120,7 +125,7 @@ func (c *Calcium) doReplaceContainer(

// 不涉及资源消耗,创建容器失败会被回收容器而不回收资源
// 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑
createMessage := c.createAndStartContainer(ctx, index, container.Node, opts, container.CPU)
createMessage := c.createAndStartContainer(ctx, index, container.Node, &opts.DeployOptions, container.CPU)
if createMessage.Error != nil {
// 重启老容器, 并不关心是否启动成功
// 注意要再次激发 hook
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Expand Up @@ -56,7 +56,7 @@ type Cluster interface {
RunAndWait(ctx context.Context, opts *types.DeployOptions, stdin io.ReadCloser) (chan *types.RunAndWaitMessage, error)
// this methods will not interrupt by client
CreateContainer(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error)
ReplaceContainer(ctx context.Context, opts *types.DeployOptions, replaceOpts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error)
ReplaceContainer(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error)
RemoveContainer(ctx context.Context, IDs []string, force bool) (chan *types.RemoveContainerMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error)

Expand Down

0 comments on commit a1459d9

Please sign in to comment.