-
Notifications
You must be signed in to change notification settings - Fork 494
/
manifold.go
134 lines (120 loc) · 3.81 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package peergrouper
import (
"github.com/juju/clock"
"github.com/juju/errors"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"github.com/prometheus/client_golang/prometheus"
"github.com/juju/juju/agent"
"github.com/juju/juju/state"
"github.com/juju/juju/worker/common"
workerstate "github.com/juju/juju/worker/state"
)
// ManifoldConfig holds the information necessary to run a peergrouper
// in a dependency.Engine.
type ManifoldConfig struct {
AgentName string
ClockName string
ControllerPortName string
StateName string
Hub Hub
PrometheusRegisterer prometheus.Registerer
NewWorker func(Config) (worker.Worker, error)
}
// Validate validates the manifold configuration.
func (config ManifoldConfig) Validate() error {
if config.AgentName == "" {
return errors.NotValidf("empty AgentName")
}
if config.ClockName == "" {
return errors.NotValidf("empty ClockName")
}
if config.ControllerPortName == "" {
return errors.NotValidf("empty ControllerPortName")
}
if config.StateName == "" {
return errors.NotValidf("empty StateName")
}
if config.Hub == nil {
return errors.NotValidf("nil Hub")
}
if config.PrometheusRegisterer == nil {
return errors.NotValidf("nil PrometheusRegisterer")
}
if config.NewWorker == nil {
return errors.NotValidf("nil NewWorker")
}
return nil
}
// Manifold returns a dependency.Manifold that will run a peergrouper.
func Manifold(config ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{
config.AgentName,
config.ClockName,
config.ControllerPortName,
config.StateName,
},
Start: config.start,
}
}
// start is a method on ManifoldConfig because it's more readable than a closure.
func (config ManifoldConfig) start(context dependency.Context) (worker.Worker, error) {
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}
var agent agent.Agent
if err := context.Get(config.AgentName, &agent); err != nil {
return nil, errors.Trace(err)
}
var clock clock.Clock
if err := context.Get(config.ClockName, &clock); err != nil {
return nil, errors.Trace(err)
}
// Ensure that the controller-port worker is running.
if err := context.Get(config.ControllerPortName, nil); err != nil {
return nil, errors.Trace(err)
}
var stTracker workerstate.StateTracker
if err := context.Get(config.StateName, &stTracker); err != nil {
return nil, errors.Trace(err)
}
statePool, err := stTracker.Use()
if err != nil {
return nil, errors.Trace(err)
}
st := statePool.SystemState()
mongoSession := st.MongoSession()
agentConfig := agent.CurrentConfig()
stateServingInfo, ok := agentConfig.StateServingInfo()
if !ok {
return nil, errors.New("state serving info missing from agent config")
}
model, err := st.Model()
if err != nil {
return nil, errors.Trace(err)
}
supportsHA := model.Type() != state.ModelTypeCAAS
w, err := config.NewWorker(Config{
State: StateShim{st},
MongoSession: MongoSessionShim{mongoSession},
APIHostPortsSetter: &CachingAPIHostPortsSetter{APIHostPortsSetter: st},
Clock: clock,
Hub: config.Hub,
MongoPort: stateServingInfo.StatePort,
APIPort: stateServingInfo.APIPort,
ControllerAPIPort: stateServingInfo.ControllerAPIPort,
SupportsHA: supportsHA,
PrometheusRegisterer: config.PrometheusRegisterer,
// On machine models, the controller id is the same as the machine/agent id.
// TODO(wallyworld) - revisit when we add HA to k8s.
ControllerId: agentConfig.Tag().Id,
})
if err != nil {
_ = stTracker.Done()
return nil, errors.Trace(err)
}
return common.NewCleanupWorker(w, func() { _ = stTracker.Done() }), nil
}