forked from juju/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manifold.go
145 lines (127 loc) · 3.66 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package stateconfigwatcher
import (
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/utils/v3/voyeur"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"gopkg.in/tomb.v2"
"github.com/juju/juju/agent"
apiagent "github.com/juju/juju/api/agent/agent"
)
var logger = loggo.GetLogger("juju.worker.stateconfigwatcher")
type ManifoldConfig struct {
AgentName string
AgentConfigChanged *voyeur.Value
}
// Manifold returns a dependency.Manifold which wraps the machine
// agent's voyeur.Value which gets set whenever the machine agent's
// config is changed. Whenever the config is updated the presence of
// state serving info is checked and if state serving info was added
// or removed the manifold worker will bounce itself.
//
// The manifold offers a single boolean output which will be true if
// state serving info is available (i.e. the machine agent should be a
// state server) and false otherwise.
//
// This manifold is intended to be used as a dependency for the state
// manifold.
func Manifold(config ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{config.AgentName},
Start: func(context dependency.Context) (worker.Worker, error) {
var a agent.Agent
if err := context.Get(config.AgentName, &a); err != nil {
return nil, err
}
if config.AgentConfigChanged == nil {
return nil, errors.NotValidf("nil AgentConfigChanged")
}
tagKind := a.CurrentConfig().Tag().Kind()
if !apiagent.IsAllowedControllerTag(tagKind) {
return nil, errors.New("manifold can only be used with a machine or controller agent")
}
w := &stateConfigWatcher{
agent: a,
agentConfigChanged: config.AgentConfigChanged,
}
w.tomb.Go(w.loop)
return w, nil
},
Output: outputFunc,
}
}
// outputFunc extracts a bool from a *stateConfigWatcher. If true, the
// agent is a state server.
func outputFunc(in worker.Worker, out interface{}) error {
inWorker, _ := in.(*stateConfigWatcher)
if inWorker == nil {
return errors.Errorf("in should be a %T; got %T", inWorker, in)
}
switch outPointer := out.(type) {
case *bool:
*outPointer = inWorker.isStateServer()
default:
return errors.Errorf("out should be *bool; got %T", out)
}
return nil
}
type stateConfigWatcher struct {
tomb tomb.Tomb
agent agent.Agent
agentConfigChanged *voyeur.Value
}
func (w *stateConfigWatcher) isStateServer() bool {
config := w.agent.CurrentConfig()
_, ok := config.StateServingInfo()
return ok
}
func (w *stateConfigWatcher) loop() error {
watch := w.agentConfigChanged.Watch()
defer watch.Close()
lastValue := w.isStateServer()
watchCh := make(chan bool)
go func() {
for {
if watch.Next() {
select {
case <-w.tomb.Dying():
return
case watchCh <- true:
}
} else {
// watcher or voyeur.Value closed.
close(watchCh)
return
}
}
}()
for {
select {
case <-w.tomb.Dying():
logger.Infof("tomb dying")
return tomb.ErrDying
case _, ok := <-watchCh:
if !ok {
return errors.New("config changed value closed")
}
if w.isStateServer() != lastValue {
// State serving info has been set or unset so restart
// so that dependents get notified. ErrBounce ensures
// that the manifold is restarted quickly.
logger.Debugf("state serving info change in agent config")
return dependency.ErrBounce
}
}
}
}
// Kill implements worker.Worker.
func (w *stateConfigWatcher) Kill() {
w.tomb.Kill(nil)
}
// Wait implements worker.Worker.
func (w *stateConfigWatcher) Wait() error {
return w.tomb.Wait()
}