/
container_initialisation.go
229 lines (205 loc) · 8.5 KB
/
container_initialisation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Copyright 2012, 2013 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package provisioner
import (
"fmt"
"sync/atomic"
"github.com/juju/utils/fslock"
"github.com/juju/juju/agent"
"github.com/juju/juju/container"
"github.com/juju/juju/container/kvm"
"github.com/juju/juju/container/lxc"
"github.com/juju/juju/environs"
"github.com/juju/juju/instance"
"github.com/juju/juju/state"
"github.com/juju/juju/state/api/params"
apiprovisioner "github.com/juju/juju/state/api/provisioner"
"github.com/juju/juju/state/api/watcher"
"github.com/juju/juju/worker"
)
// ContainerSetup is a StringsWatchHandler that is notified when containers
// are created on the given machine. It will set up the machine to be able
// to create containers and start a suitable provisioner.
type ContainerSetup struct {
runner worker.Runner
supportedContainers []instance.ContainerType
provisioner *apiprovisioner.State
machine *apiprovisioner.Machine
config agent.Config
initLock *fslock.Lock
// Save the workerName so the worker thread can be stopped.
workerName string
// setupDone[containerType] is non zero if the container setup has been invoked
// for that container type.
setupDone map[instance.ContainerType]*int32
// The number of provisioners started. Once all necessary provisioners have
// been started, the container watcher can be stopped.
numberProvisioners int32
}
// NewContainerSetupHandler returns a StringsWatchHandler which is notified when
// containers are created on the given machine.
func NewContainerSetupHandler(runner worker.Runner, workerName string, supportedContainers []instance.ContainerType,
machine *apiprovisioner.Machine, provisioner *apiprovisioner.State,
config agent.Config, initLock *fslock.Lock) worker.StringsWatchHandler {
return &ContainerSetup{
runner: runner,
machine: machine,
supportedContainers: supportedContainers,
provisioner: provisioner,
config: config,
workerName: workerName,
initLock: initLock,
}
}
// SetUp is defined on the StringsWatchHandler interface.
func (cs *ContainerSetup) SetUp() (watcher watcher.StringsWatcher, err error) {
// Set up the semaphores for each container type.
cs.setupDone = make(map[instance.ContainerType]*int32, len(instance.ContainerTypes))
for _, containerType := range instance.ContainerTypes {
zero := int32(0)
cs.setupDone[containerType] = &zero
}
// Listen to all container lifecycle events on our machine.
if watcher, err = cs.machine.WatchAllContainers(); err != nil {
return nil, err
}
return watcher, nil
}
// Handle is called whenever containers change on the machine being watched.
// Machines start out with no containers so the first time Handle is called,
// it will be because a container has been added.
func (cs *ContainerSetup) Handle(containerIds []string) (resultError error) {
// Consume the initial watcher event.
if len(containerIds) == 0 {
return nil
}
logger.Tracef("initial container setup with ids: %v", containerIds)
for _, id := range containerIds {
containerType := state.ContainerTypeFromId(id)
// If this container type has been dealt with, do nothing.
if atomic.LoadInt32(cs.setupDone[containerType]) != 0 {
continue
}
if err := cs.initialiseAndStartProvisioner(containerType); err != nil {
logger.Errorf("starting container provisioner for %v: %v", containerType, err)
// Just because dealing with one type of container fails, we won't exit the entire
// function because we still want to try and start other container types. So we
// take note of and return the first such error.
if resultError == nil {
resultError = err
}
}
}
return resultError
}
func (cs *ContainerSetup) initialiseAndStartProvisioner(containerType instance.ContainerType) error {
// Flag that this container type has been handled.
atomic.StoreInt32(cs.setupDone[containerType], 1)
if atomic.AddInt32(&cs.numberProvisioners, 1) == int32(len(cs.supportedContainers)) {
// We only care about the initial container creation.
// This worker has done its job so stop it.
// We do not expect there will be an error, and there's not much we can do anyway.
if err := cs.runner.StopWorker(cs.workerName); err != nil {
logger.Warningf("stopping machine agent container watcher: %v", err)
}
}
// We only care about the initial container creation.
// This worker has done its job so stop it.
// We do not expect there will be an error, and there's not much we can do anyway.
if err := cs.runner.StopWorker(cs.workerName); err != nil {
logger.Warningf("stopping machine agent container watcher: %v", err)
}
if initialiser, broker, err := cs.getContainerArtifacts(containerType); err != nil {
return fmt.Errorf("initialising container infrastructure on host machine: %v", err)
} else {
if err := cs.runInitialiser(containerType, initialiser); err != nil {
return fmt.Errorf("setting up container dependencies on host machine: %v", err)
}
return StartProvisioner(cs.runner, containerType, cs.provisioner, cs.config, broker)
}
}
// runInitialiser runs the container initialiser with the initialisation hook held.
func (cs *ContainerSetup) runInitialiser(containerType instance.ContainerType, initialiser container.Initialiser) error {
if err := cs.initLock.Lock(fmt.Sprintf("initialise-%s", containerType)); err != nil {
return fmt.Errorf("failed to acquire initialization lock: %v", err)
}
defer cs.initLock.Unlock()
return initialiser.Initialise()
}
// TearDown is defined on the StringsWatchHandler interface.
func (cs *ContainerSetup) TearDown() error {
// Nothing to do here.
return nil
}
func (cs *ContainerSetup) getContainerArtifacts(containerType instance.ContainerType) (container.Initialiser, environs.InstanceBroker, error) {
tools, err := cs.provisioner.Tools(cs.config.Tag())
if err != nil {
logger.Errorf("cannot get tools from machine for %s container", containerType)
return nil, nil, err
}
var initialiser container.Initialiser
var broker environs.InstanceBroker
managerConfig, err := containerManagerConfig(containerType, cs.provisioner, cs.config)
if err != nil {
return nil, nil, err
}
switch containerType {
case instance.LXC:
series, err := cs.machine.Series()
if err != nil {
return nil, nil, err
}
initialiser = lxc.NewContainerInitialiser(series)
broker, err = NewLxcBroker(cs.provisioner, tools, cs.config, managerConfig)
if err != nil {
return nil, nil, err
}
case instance.KVM:
initialiser = kvm.NewContainerInitialiser()
broker, err = NewKvmBroker(cs.provisioner, tools, cs.config, managerConfig)
if err != nil {
logger.Errorf("failed to create new kvm broker")
return nil, nil, err
}
default:
return nil, nil, fmt.Errorf("unknown container type: %v", containerType)
}
return initialiser, broker, nil
}
func containerManagerConfig(
containerType instance.ContainerType,
provisioner *apiprovisioner.State,
agentConfig agent.Config,
) (container.ManagerConfig, error) {
// Ask the provisioner for the container manager configuration.
managerConfigResult, err := provisioner.ContainerManagerConfig(
params.ContainerManagerConfigParams{Type: containerType},
)
if params.IsCodeNotImplemented(err) {
// We currently don't support upgrading;
// revert to the old configuration.
managerConfigResult.ManagerConfig = container.ManagerConfig{container.ConfigName: "juju"}
}
if err != nil {
return nil, err
}
// If a namespace is specified, that should instead be used as the config name.
if namespace := agentConfig.Value(agent.Namespace); namespace != "" {
managerConfigResult.ManagerConfig[container.ConfigName] = namespace
}
managerConfig := container.ManagerConfig(managerConfigResult.ManagerConfig)
return managerConfig, nil
}
// Override for testing.
var StartProvisioner = startProvisionerWorker
// startProvisionerWorker kicks off a provisioner task responsible for creating containers
// of the specified type on the machine.
func startProvisionerWorker(runner worker.Runner, containerType instance.ContainerType,
provisioner *apiprovisioner.State, config agent.Config, broker environs.InstanceBroker) error {
workerName := fmt.Sprintf("%s-provisioner", containerType)
// The provisioner task is created after a container record has already been added to the machine.
// It will see that the container does not have an instance yet and create one.
return runner.StartWorker(workerName, func() (worker.Worker, error) {
return NewContainerProvisioner(containerType, provisioner, config, broker), nil
})
}