forked from juju/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
machines.go
156 lines (145 loc) · 4.17 KB
/
machines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package storageprovisioner
import (
"github.com/juju/errors"
"github.com/juju/names/v4"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/catacomb"
"github.com/juju/juju/core/instance"
"github.com/juju/juju/rpc/params"
)
// watchMachine starts a machine watcher if there is not already one for the
// specified tag. The watcher will notify the worker when the machine changes,
// for example when it is provisioned.
func watchMachine(ctx *context, tag names.MachineTag) {
_, ok := ctx.machines[tag]
if ok {
return
}
w, err := newMachineWatcher(ctx.config.Machines, tag, ctx.machineChanges, ctx.config.Logger)
if err != nil {
ctx.kill(errors.Trace(err))
} else if err := ctx.addWorker(w); err != nil {
ctx.kill(errors.Trace(err))
} else {
ctx.machines[tag] = w
}
}
// refreshMachine refreshes the specified machine's instance ID. If it is set,
// then the machine watcher is stopped and pending entities' parameters are
// updated. If the machine is not provisioned yet, this method is a no-op.
func refreshMachine(ctx *context, tag names.MachineTag) error {
w, ok := ctx.machines[tag]
if !ok {
return errors.Errorf("machine %s is not being watched", tag.Id())
}
stopAndRemove := func() error {
_ = worker.Stop(w)
delete(ctx.machines, tag)
return nil
}
results, err := ctx.config.Machines.InstanceIds([]names.MachineTag{tag})
if err != nil {
return errors.Annotate(err, "getting machine instance ID")
}
if err := results[0].Error; err != nil {
if params.IsCodeNotProvisioned(err) {
return nil
} else if params.IsCodeNotFound(err) {
// Machine is gone, so stop watching.
return stopAndRemove()
}
return errors.Annotate(err, "getting machine instance ID")
}
machineProvisioned(ctx, tag, instance.Id(results[0].Result))
// machine provisioning is the only thing we care about;
// stop the watcher.
return stopAndRemove()
}
// machineProvisioned is called when a watched machine is provisioned.
func machineProvisioned(ctx *context, tag names.MachineTag, instanceId instance.Id) {
for _, params := range ctx.incompleteVolumeParams {
if params.Attachment.Machine != tag || params.Attachment.InstanceId != "" {
continue
}
params.Attachment.InstanceId = instanceId
updatePendingVolume(ctx, params)
}
for id, params := range ctx.incompleteVolumeAttachmentParams {
if params.Machine != tag || params.InstanceId != "" {
continue
}
params.InstanceId = instanceId
updatePendingVolumeAttachment(ctx, id, params)
}
for id, params := range ctx.incompleteFilesystemAttachmentParams {
if params.Machine != tag || params.InstanceId != "" {
continue
}
params.InstanceId = instanceId
updatePendingFilesystemAttachment(ctx, id, params)
}
}
type machineWatcher struct {
catacomb catacomb.Catacomb
accessor MachineAccessor
tag names.MachineTag
instanceId instance.Id
out chan<- names.MachineTag
logger Logger
}
func newMachineWatcher(
accessor MachineAccessor,
tag names.MachineTag,
out chan<- names.MachineTag,
logger Logger,
) (*machineWatcher, error) {
w := &machineWatcher{
accessor: accessor,
tag: tag,
out: out,
logger: logger,
}
err := catacomb.Invoke(catacomb.Plan{
Site: &w.catacomb,
Work: w.loop,
})
if err != nil {
return nil, errors.Trace(err)
}
return w, nil
}
func (mw *machineWatcher) loop() error {
w, err := mw.accessor.WatchMachine(mw.tag)
if err != nil {
return errors.Annotate(err, "watching machine")
}
if err := mw.catacomb.Add(w); err != nil {
return errors.Trace(err)
}
mw.logger.Debugf("watching machine %s", mw.tag.Id())
defer mw.logger.Debugf("finished watching machine %s", mw.tag.Id())
var out chan<- names.MachineTag
for {
select {
case <-mw.catacomb.Dying():
return mw.catacomb.ErrDying()
case _, ok := <-w.Changes():
if !ok {
return errors.New("machine watcher closed")
}
out = mw.out
case out <- mw.tag:
out = nil
}
}
}
// Kill is part of the worker.Worker interface.
func (mw *machineWatcher) Kill() {
mw.catacomb.Kill(nil)
}
// Wait is part of the worker.Worker interface.
func (mw *machineWatcher) Wait() error {
return mw.catacomb.Wait()
}