/
manifold.go
97 lines (86 loc) · 2.78 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright 2019 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package upgradedatabase
import (
"time"
"github.com/juju/errors"
"github.com/juju/retry"
"github.com/juju/version/v2"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"github.com/juju/juju/agent"
"github.com/juju/juju/state"
"github.com/juju/juju/upgrades"
"github.com/juju/juju/worker/gate"
)
// ManifoldConfig defines the configuration on which this manifold depends.
type ManifoldConfig struct {
AgentName string
UpgradeDBGateName string
Logger Logger
OpenState func() (*state.StatePool, error)
Clock Clock
}
// Validate returns an error if the manifold config is not valid.
func (cfg ManifoldConfig) Validate() error {
if cfg.UpgradeDBGateName == "" {
return errors.NotValidf("empty UpgradeDBGateName")
}
if cfg.Logger == nil {
return errors.NotValidf("nil Logger")
}
if cfg.OpenState == nil {
return errors.NotValidf("nil OpenState function")
}
if cfg.Clock == nil {
return errors.NotValidf("nil Clock")
}
return nil
}
// Manifold returns a dependency manifold that runs a database upgrade worker
// using the resource names defined in the supplied config.
func Manifold(cfg ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{
cfg.AgentName,
cfg.UpgradeDBGateName,
},
Start: func(context dependency.Context) (worker.Worker, error) {
// Get the completed lock.
var upgradeStepsLock gate.Lock
if err := context.Get(cfg.UpgradeDBGateName, &upgradeStepsLock); err != nil {
return nil, errors.Trace(err)
}
// Determine this controller's agent and tag.
var controllerAgent agent.Agent
if err := context.Get(cfg.AgentName, &controllerAgent); err != nil {
return nil, errors.Trace(err)
}
tag := controllerAgent.CurrentConfig().Tag()
// Wrap the state pool factory to return our implementation.
openState := func() (Pool, error) {
p, err := cfg.OpenState()
if err != nil {
return nil, errors.Trace(err)
}
return &pool{p}, nil
}
// Wrap the upgrade steps execution so that we can generate a context lazily.
performUpgrade := func(v version.Number, t []upgrades.Target, c func() upgrades.Context) error {
return errors.Trace(upgrades.PerformStateUpgrade(v, t, c()))
}
workerCfg := Config{
UpgradeComplete: upgradeStepsLock,
Tag: tag,
Agent: controllerAgent,
Logger: cfg.Logger,
OpenState: openState,
PerformUpgrade: performUpgrade,
RetryStrategy: retry.CallArgs{Clock: cfg.Clock, Delay: 2 * time.Minute, Attempts: 5},
Clock: cfg.Clock,
}
w, err := NewWorker(workerCfg)
return w, errors.Annotate(err, "starting database upgrade worker")
},
}
}