/
manifold.go
162 lines (142 loc) · 5.08 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package meterstatus
import (
"os"
"path"
"time"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/names/v5"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"github.com/juju/juju/agent"
"github.com/juju/juju/api/agent/meterstatus"
"github.com/juju/juju/api/base"
"github.com/juju/juju/api/common"
"github.com/juju/juju/core/machinelock"
)
// Logger is here to stop the desire of creating a package level Logger.
// Don't do this, instead use the logger passed into the manifold.
type logger interface{}
var _ logger = struct{}{}
// Logger represents the logging methods used in this package.
type Logger interface {
Errorf(string, ...interface{})
Warningf(string, ...interface{})
Infof(string, ...interface{})
Debugf(string, ...interface{})
Tracef(string, ...interface{})
Root() loggo.Logger
}
// Clock defines the time methods used by this package.
type Clock interface {
Now() time.Time
After(time.Duration) <-chan time.Time
}
// ManifoldConfig identifies the resource names upon which the status manifold depends.
type ManifoldConfig struct {
AgentName string
APICallerName string
MachineLock machinelock.Lock
Clock Clock
Logger Logger
NewHookRunner func(HookRunnerConfig) HookRunner
NewMeterStatusAPIClient func(base.APICaller, names.UnitTag) meterstatus.MeterStatusClient
NewUniterStateAPIClient func(base.FacadeCaller, names.UnitTag) *common.UnitStateAPI
NewConnectedStatusWorker func(ConnectedConfig) (worker.Worker, error)
NewIsolatedStatusWorker func(IsolatedConfig) (worker.Worker, error)
}
// Manifold returns a status manifold.
func Manifold(config ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{
config.AgentName,
config.APICallerName,
},
Start: func(context dependency.Context) (worker.Worker, error) {
if config.Clock == nil {
return nil, errors.NotValidf("missing Clock")
}
if config.MachineLock == nil {
return nil, errors.NotValidf("missing MachineLock")
}
return newStatusWorker(config, context)
},
}
}
func newStatusWorker(config ManifoldConfig, context dependency.Context) (worker.Worker, error) {
var agent agent.Agent
if err := context.Get(config.AgentName, &agent); err != nil {
return nil, err
}
tag := agent.CurrentConfig().Tag()
unitTag, ok := tag.(names.UnitTag)
if !ok {
return nil, errors.Errorf("expected unit tag, got %v", tag)
}
agentConfig := agent.CurrentConfig()
runner := config.NewHookRunner(HookRunnerConfig{
MachineLock: config.MachineLock,
AgentConfig: agentConfig,
Tag: unitTag,
Clock: config.Clock,
Logger: config.Logger,
})
localStateFile := path.Join(agentConfig.DataDir(), "meter-status.yaml")
// If we don't have a valid APICaller, start a meter status
// worker that works without an API connection. Since the worker
// cannot talk to the controller to persist its state, we will provide
// it with a disk-backed StateReadWriter and attempt to push the data
// back to the controller once we get a valid connection.
var apiCaller base.APICaller
err := context.Get(config.APICallerName, &apiCaller)
if errors.Cause(err) == dependency.ErrMissing {
config.Logger.Tracef("API caller dependency not available, starting isolated meter status worker.")
cfg := IsolatedConfig{
Runner: runner,
StateReadWriter: NewDiskBackedState(localStateFile),
Clock: config.Clock,
Logger: config.Logger,
AmberGracePeriod: defaultAmberGracePeriod,
RedGracePeriod: defaultRedGracePeriod,
TriggerFactory: GetTriggers,
}
return config.NewIsolatedStatusWorker(cfg)
} else if err != nil {
return nil, err
}
config.Logger.Tracef("Starting connected meter status worker.")
status := config.NewMeterStatusAPIClient(apiCaller, unitTag)
stateReadWriter := NewControllerBackedState(
config.NewUniterStateAPIClient(
base.NewFacadeCaller(apiCaller, "MeterStatus"),
unitTag,
),
)
// Check if a local state file exists from a previous isolated worker
// instance. If one is found, migrate it to the controller and remove
// it from disk; this doubles as an auto-magic migration step.
priorState, err := NewDiskBackedState(localStateFile).Read()
if err != nil && !errors.IsNotFound(err) {
return nil, errors.Annotate(err, "reading locally persisted worker state")
} else if err == nil {
config.Logger.Infof("detected locally persisted worker state; migrating to the controller")
if err = stateReadWriter.Write(priorState); err != nil {
return nil, errors.Trace(err)
}
// We can now safely delete the state from disk. It's fine for
// the deletion attempt to fail; we simply log it as a warning
// as it's non-fatal.
if err = os.Remove(localStateFile); err != nil {
config.Logger.Warningf("unable to remove existing local state file: %v", err)
}
}
cfg := ConnectedConfig{
Runner: runner,
StateReadWriter: stateReadWriter,
Status: status,
Logger: config.Logger,
}
return config.NewConnectedStatusWorker(cfg)
}