/
machiner.go
250 lines (220 loc) · 8.16 KB
/
machiner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Copyright 2012, 2013 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package machiner
import (
"net"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/names/v5"
"github.com/juju/worker/v3"
corelife "github.com/juju/juju/core/life"
corenetwork "github.com/juju/juju/core/network"
"github.com/juju/juju/core/status"
"github.com/juju/juju/core/watcher"
"github.com/juju/juju/network"
"github.com/juju/juju/rpc/params"
jworker "github.com/juju/juju/worker"
)
var logger = loggo.GetLogger("juju.worker.machiner")
// Config defines the configuration for a machiner worker.
type Config struct {
// MachineAccessor provides a means of observing and updating the
// machine's state.
MachineAccessor MachineAccessor
// Tag is the machine's tag.
Tag names.MachineTag
// ClearMachineAddressesOnStart indicates whether or not to clear
// the machine's machine addresses when the worker starts.
ClearMachineAddressesOnStart bool
}
// Validate reports whether or not the configuration is valid.
func (cfg *Config) Validate() error {
if cfg.MachineAccessor == nil {
return errors.NotValidf("unspecified MachineAccessor")
}
if cfg.Tag == (names.MachineTag{}) {
return errors.NotValidf("unspecified Tag")
}
return nil
}
// Machiner is responsible for a machine agent's lifecycle.
type Machiner struct {
config Config
machine Machine
}
// NewMachiner returns a Worker that will wait for the identified machine
// to become Dying and make it Dead; or until the machine becomes Dead by
// other means.
//
// The machineDead function will be called immediately after the machine's
// lifecycle is updated to Dead.
var NewMachiner = func(cfg Config) (worker.Worker, error) {
if err := cfg.Validate(); err != nil {
return nil, errors.Annotate(err, "validating config")
}
handler := &Machiner{config: cfg}
w, err := watcher.NewNotifyWorker(watcher.NotifyConfig{
Handler: handler,
})
if err != nil {
return nil, errors.Trace(err)
}
return w, nil
}
var getObservedNetworkConfig = corenetwork.GetObservedNetworkConfig
func (mr *Machiner) SetUp() (watcher.NotifyWatcher, error) {
// Find which machine we're responsible for.
m, err := mr.config.MachineAccessor.Machine(mr.config.Tag)
if params.IsCodeNotFoundOrCodeUnauthorized(err) {
return nil, jworker.ErrTerminateAgent
} else if err != nil {
return nil, errors.Trace(err)
}
mr.machine = m
switch {
case corelife.IsNotAlive(m.Life()):
// Can happen when the machiner is restarting after a failure in EnsureDead.
// Since we're dying or dead, no need handle the machine addresses.
logger.Infof("%q not alive", mr.config.Tag)
return m.Watch()
case mr.config.ClearMachineAddressesOnStart:
logger.Debugf("machiner configured to reset machine %q addresses to empty", mr.config.Tag)
if err := m.SetMachineAddresses(nil); err != nil {
return nil, errors.Annotate(err, "resetting machine addresses")
}
default:
// Set the addresses in state to the host's addresses if the
// machine is alive. No need to set addresses if the machine
// is dying or dead on a worker restart.
if err := setMachineAddresses(mr.config.Tag, m); err != nil {
return nil, errors.Annotate(err, "setting machine addresses")
}
}
// Mark the machine as started and log it.
if err := m.SetStatus(status.Started, "", nil); err != nil {
return nil, errors.Annotatef(err, "%s failed to set status started", mr.config.Tag)
}
logger.Infof("%q started", mr.config.Tag)
return m.Watch()
}
var interfaceAddrs = net.InterfaceAddrs
// setMachineAddresses sets the addresses for this machine to all of the
// host's non-loopback interface IP addresses.
func setMachineAddresses(tag names.MachineTag, m Machine) error {
addrs, err := interfaceAddrs()
if err != nil {
return err
}
var hostAddresses corenetwork.ProviderAddresses
for _, addr := range addrs {
var (
ip net.IP
netmask net.IPMask
)
switch addr := addr.(type) {
case *net.IPAddr:
ip = addr.IP
case *net.IPNet:
ip = addr.IP
netmask = addr.Mask
default:
continue
}
// If we discovered a netmask for the address, include a CIDR
// when constructing the provider address.
var addrOpts []func(corenetwork.AddressMutator)
if cidr := corenetwork.NetworkCIDRFromIPAndMask(ip, netmask); cidr != "" {
addrOpts = append(addrOpts, corenetwork.WithCIDR(cidr))
}
address := corenetwork.NewMachineAddress(ip.String(), addrOpts...).AsProviderAddress()
// Filter out link-local addresses as we cannot reliably use them.
if address.Scope == corenetwork.ScopeLinkLocal {
continue
}
hostAddresses = append(hostAddresses, address)
}
if len(hostAddresses) == 0 {
return nil
}
// Filter out any LXC or LXD bridge addresses.
hostAddresses = network.FilterBridgeAddresses(hostAddresses)
logger.Infof("setting addresses for %q to %v", tag, hostAddresses)
// TODO (manadart 2019-08-27): This needs refactoring.
// FilterBridgeAddresses takes a slice of ProviderAddress,
// so we create an initial slice of that type and extract the machine
// addresses after filtering.
// We should work in an appropriate indirection to achieve this logic.
machineAddresses := make([]corenetwork.MachineAddress, len(hostAddresses))
for i, addr := range hostAddresses {
machineAddresses[i] = addr.MachineAddress
}
return m.SetMachineAddresses(machineAddresses)
}
func (mr *Machiner) Handle(_ <-chan struct{}) error {
if err := mr.machine.Refresh(); params.IsCodeNotFoundOrCodeUnauthorized(err) {
// NOTE(axw) we can distinguish between NotFound and CodeUnauthorized,
// so we could call NotifyMachineDead here in case the agent failed to
// call NotifyMachineDead directly after setting the machine Dead in
// the first place. We're not doing that to be cautious: the machine
// could be missing from state due to invalid global state.
return jworker.ErrTerminateAgent
} else if err != nil {
return err
}
life := mr.machine.Life()
if life == corelife.Alive {
interfaceInfos, err := getObservedNetworkConfig(corenetwork.DefaultConfigSource())
if err != nil {
return errors.Annotate(err, "cannot discover observed network config")
} else if len(interfaceInfos) == 0 {
logger.Warningf("not updating network config: no observed config found to update")
}
observedConfig := params.NetworkConfigFromInterfaceInfo(interfaceInfos)
if len(observedConfig) > 0 {
if err := mr.machine.SetObservedNetworkConfig(observedConfig); err != nil {
return errors.Annotate(err, "cannot update observed network config")
}
}
logger.Debugf("observed network config updated for %q to %+v", mr.config.Tag, observedConfig)
return nil
}
logger.Debugf("%q is now %s", mr.config.Tag, life)
if err := mr.machine.SetStatus(status.Stopped, "", nil); err != nil {
return errors.Annotatef(err, "%s failed to set status stopped", mr.config.Tag)
}
// Attempt to mark the machine Dead. If the machine still has units
// assigned, or storage attached, this will fail with
// CodeHasAssignedUnits or CodeMachineHasAttachedStorage respectively.
// Once units or storage are removed, the watcher will trigger again
// and we'll reattempt. If the machine has containers, EnsureDead will
// fail with CodeMachineHasContainers. However the watcher will not
// trigger again, so fail and let the machiner restart and try again.
if err := mr.machine.EnsureDead(); err != nil {
if params.IsCodeHasAssignedUnits(err) {
logger.Tracef("machine still has units")
return nil
}
if params.IsCodeMachineHasAttachedStorage(err) {
logger.Tracef("machine still has storage attached")
return nil
}
if params.IsCodeTryAgain(err) {
logger.Tracef("waiting for machine to be removed as a controller")
return nil
}
if params.IsCodeMachineHasContainers(err) {
logger.Tracef("machine still has containers")
return errors.Annotatef(err, "%q", mr.config.Tag)
}
err = errors.Annotatef(err, "%s failed to set machine to dead", mr.config.Tag)
if e := mr.machine.SetStatus(status.Error, errors.Annotate(err, "destroying machine").Error(), nil); e != nil {
logger.Errorf("failed to set status for error %v ", err)
}
return errors.Trace(err)
}
return jworker.ErrTerminateAgent
}
func (mr *Machiner) TearDown() error {
// Nothing to do here.
return nil
}