forked from projecteru2/core
/
replace_container.go
148 lines (129 loc) · 4.43 KB
/
replace_container.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package calcium
import (
"context"
"fmt"
"sync"
enginetypes "github.com/docker/docker/api/types"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
// ReplaceContainer replace containers with same resource
func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.DeployOptions, force bool) (chan *types.ReplaceContainerMessage, error) {
oldContainers, err := c.ListContainers(ctx, opts.Name, opts.Entrypoint.Name, opts.Nodename)
if err != nil {
return nil, err
}
ch := make(chan *types.ReplaceContainerMessage)
go func() {
defer close(ch)
// 并发控制
step := opts.Count
wg := sync.WaitGroup{}
ib := newImageBucket()
defer wg.Wait()
for index, oldContainer := range oldContainers {
log.Debugf("[ReplaceContainer] Replace old container %s", oldContainer.ID)
wg.Add(1)
go func(deployOpts types.DeployOptions, oldContainer *types.Container, index int) {
defer wg.Done()
// 使用复制之后的配置
// 停老的,起新的
deployOpts.Memory = oldContainer.Memory
deployOpts.CPUQuota = oldContainer.Quota
deployOpts.SoftLimit = oldContainer.SoftLimit
createMessage, removeMessage, err := c.doReplaceContainer(ctx, oldContainer, &deployOpts, ib, index, force)
ch <- &types.ReplaceContainerMessage{
Create: createMessage,
Remove: removeMessage,
Error: err,
}
if err != nil {
log.Errorf("[ReplaceContainer] Replace and remove failed %v, old container restarted", err)
return
}
log.Infof("[ReplaceContainer] Replace and remove success %s", oldContainer.ID)
// 传 opts 的值,产生一次复制
}(*opts, oldContainer, index)
if (index+1)%step == 0 {
wg.Wait()
}
}
// 把收集的image清理掉
//TODO 如果 remove 是异步的,这里就不能用 ctx 了,gRPC 一断这里就会死
go c.cleanCachedImage(ctx, ib)
}()
return ch, nil
}
func (c *Calcium) doReplaceContainer(
ctx context.Context,
container *types.Container,
opts *types.DeployOptions,
ib *imageBucket,
index int,
force bool,
) (*types.CreateContainerMessage, *types.RemoveContainerMessage, error) {
removeMessage := &types.RemoveContainerMessage{
ContainerID: container.ID,
Success: false,
Message: "",
}
// 锁住,防止删除
lock, err := c.Lock(ctx, fmt.Sprintf("rmcontainer_%s", container.ID), int(c.config.GlobalTimeout.Seconds()))
if err != nil {
return nil, removeMessage, err
}
defer lock.Unlock(ctx)
// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
return nil, removeMessage, err
}
// 记录镜像
if ib != nil {
ib.Add(container.Podname, containerJSON.Config.Image)
}
removeMessage.Message, err = c.doStopContainer(ctx, container, containerJSON, ib, force)
if err != nil {
return nil, removeMessage, err
}
// 拉镜像
auth, err := makeEncodedAuthConfigFromRemote(c.config.Docker.AuthConfigs, opts.Image)
if err != nil {
return nil, removeMessage, err
}
if err = pullImage(ctx, container.Node, opts.Image, auth); err != nil {
return nil, removeMessage, err
}
// 不涉及资源消耗,创建容器失败会被回收容器而不回收资源
// 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑
createMessage := c.createAndStartContainer(ctx, index, container.Node, opts, container.CPU)
if createMessage.Error != nil {
// 重启老容器, 并不关心是否启动成功
// 注意要再次激发 hook
if err = container.Engine.ContainerStart(ctx, container.ID, enginetypes.ContainerStartOptions{}); err != nil {
log.Errorf("[replaceAndRemove] Old container %s restart failed %v", container.ID, err)
}
if container.Hook != nil && len(container.Hook.AfterStart) > 0 {
output, err := c.doContainerAfterStartHook(
ctx, container,
containerJSON.Config.User,
containerJSON.Config.Env,
container.Privileged,
)
log.Infof("[replaceAndRemove] Do after start hook %s", output)
removeMessage.Message += string(output)
if err != nil {
log.Errorf("[replaceAndRemove] Old container %s after hook failed %v", container.ID, err)
}
}
return nil, removeMessage, createMessage.Error
}
//TODO healthcheck
// 干掉老的
if err = c.doRemoveContainer(ctx, container); err != nil {
log.Errorf("[replaceAndRemove] Old container %s remove failed %v", container.ID, err)
return createMessage, removeMessage, err
}
removeMessage.Success = true
return createMessage, removeMessage, nil
}