-
Notifications
You must be signed in to change notification settings - Fork 491
/
manifold.go
184 lines (164 loc) · 5.62 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright 2021 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package dbaccessor
import (
"context"
"github.com/juju/clock"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"github.com/prometheus/client_golang/prometheus"
"github.com/juju/juju/agent"
coredatabase "github.com/juju/juju/core/database"
"github.com/juju/juju/database"
"github.com/juju/juju/database/app"
"github.com/juju/juju/worker/common"
)
// Logger represents the logging methods called.
type Logger interface {
Errorf(message string, args ...interface{})
Warningf(message string, args ...interface{})
Infof(message string, args ...interface{})
Debugf(message string, args ...interface{})
Tracef(message string, args ...interface{})
// Logf is used to proxy Dqlite logs via this logger.
Logf(level loggo.Level, msg string, args ...interface{})
IsTraceEnabled() bool
}
// Hub defines the methods of the API server central hub
// that the DB accessor requires.
type Hub interface {
Subscribe(topic string, handler interface{}) (func(), error)
Publish(topic string, data interface{}) (func(), error)
}
// ManifoldConfig contains:
// - The names of other manifolds on which the DB accessor depends.
// - Other dependencies from ManifoldsConfig required by the worker.
type ManifoldConfig struct {
AgentName string
QueryLoggerName string
Clock clock.Clock
Hub Hub
Logger Logger
LogDir string
PrometheusRegisterer prometheus.Registerer
NewApp func(string, ...app.Option) (DBApp, error)
NewDBWorker func(context.Context, DBApp, string, ...TrackedDBWorkerOption) (TrackedDB, error)
NewNodeManager func(agent.Config, Logger, coredatabase.SlowQueryLogger) NodeManager
NewMetricsCollector func() *Collector
}
func (cfg ManifoldConfig) Validate() error {
if cfg.AgentName == "" {
return errors.NotValidf("empty AgentName")
}
if cfg.QueryLoggerName == "" {
return errors.NotValidf("empty QueryLoggerName")
}
if cfg.Clock == nil {
return errors.NotValidf("nil Clock")
}
if cfg.Hub == nil {
return errors.NotValidf("nil Hub")
}
if cfg.Logger == nil {
return errors.NotValidf("nil Logger")
}
if cfg.LogDir == "" {
return errors.NotValidf("empty LogDir")
}
if cfg.PrometheusRegisterer == nil {
return errors.NotValidf("nil PrometheusRegisterer")
}
if cfg.NewApp == nil {
return errors.NotValidf("nil NewApp")
}
if cfg.NewDBWorker == nil {
return errors.NotValidf("nil NewDBWorker")
}
if cfg.NewNodeManager == nil {
return errors.NotValidf("nil NewNodeManager")
}
if cfg.NewMetricsCollector == nil {
return errors.NotValidf("nil NewMetricsCollector")
}
return nil
}
// Manifold returns a dependency manifold that runs the dbaccessor
// worker, using the resource names defined in the supplied config.
func Manifold(config ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{
config.AgentName,
config.QueryLoggerName,
},
Output: dbAccessorOutput,
Start: func(context dependency.Context) (worker.Worker, error) {
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}
var agent agent.Agent
if err := context.Get(config.AgentName, &agent); err != nil {
return nil, err
}
agentConfig := agent.CurrentConfig()
// Register the metrics collector against the prometheus register.
metricsCollector := config.NewMetricsCollector()
if err := config.PrometheusRegisterer.Register(metricsCollector); err != nil {
return nil, errors.Trace(err)
}
var slowQueryLogger coredatabase.SlowQueryLogger
if err := context.Get(config.QueryLoggerName, &slowQueryLogger); err != nil {
config.PrometheusRegisterer.Unregister(metricsCollector)
return nil, err
}
cfg := WorkerConfig{
NodeManager: config.NewNodeManager(agentConfig, config.Logger, slowQueryLogger),
Clock: config.Clock,
Hub: config.Hub,
ControllerID: agentConfig.Tag().Id(),
MetricsCollector: metricsCollector,
Logger: config.Logger,
NewApp: config.NewApp,
NewDBWorker: config.NewDBWorker,
}
w, err := newWorker(cfg)
if err != nil {
config.PrometheusRegisterer.Unregister(metricsCollector)
return nil, errors.Trace(err)
}
return common.NewCleanupWorker(w, func() {
// Clean up the metrics for the worker, so the next time a
// worker is created we can safely register the metrics again.
config.PrometheusRegisterer.Unregister(metricsCollector)
}), nil
},
}
}
func dbAccessorOutput(in worker.Worker, out interface{}) error {
if w, ok := in.(*common.CleanupWorker); ok {
in = w.Worker
}
w, ok := in.(*dbWorker)
if !ok {
return errors.Errorf("expected input of type dbWorker, got %T", in)
}
switch out := out.(type) {
case *coredatabase.DBGetter:
var target coredatabase.DBGetter = w
*out = target
default:
return errors.Errorf("expected output of *database.DBGetter, got %T", out)
}
return nil
}
// IAASNodeManager returns a NodeManager that is configured to use
// the cloud-local TLS terminated address for Dqlite.
func IAASNodeManager(cfg agent.Config, logger Logger, slowQueryLogger coredatabase.SlowQueryLogger) NodeManager {
return database.NewNodeManager(cfg, false, logger, slowQueryLogger)
}
// CAASNodeManager returns a NodeManager that is configured to use
// the loopback address for Dqlite.
func CAASNodeManager(cfg agent.Config, logger Logger, slowQueryLogger coredatabase.SlowQueryLogger) NodeManager {
return database.NewNodeManager(cfg, true, logger, slowQueryLogger)
}