/
runaction.go
173 lines (155 loc) · 4.52 KB
/
runaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright 2014 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package operation
import (
"fmt"
"github.com/juju/errors"
"github.com/juju/juju/api/agent/uniter"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/worker/common/charmrunner"
"github.com/juju/juju/worker/uniter/remotestate"
"github.com/juju/juju/worker/uniter/runner"
)
type runAction struct {
action *uniter.Action
change int
changed chan struct{}
cancel chan struct{}
callbacks Callbacks
runnerFactory runner.Factory
name string
runner runner.Runner
logger Logger
}
// String is part of the Operation interface.
func (ra *runAction) String() string {
return fmt.Sprintf("run action %s", ra.action.ID())
}
// NeedsGlobalMachineLock is part of the Operation interface.
func (ra *runAction) NeedsGlobalMachineLock() bool {
return !ra.action.Parallel() || ra.action.ExecutionGroup() != ""
}
// ExecutionGroup is part of the Operation interface.
func (ra *runAction) ExecutionGroup() string {
return ra.action.ExecutionGroup()
}
// Prepare ensures that the action is valid and can be executed. If not, it
// will return ErrSkipExecute. It preserves any hook recorded in the supplied
// state.
// Prepare is part of the Operation interface.
func (ra *runAction) Prepare(state State) (*State, error) {
ra.changed = make(chan struct{}, 1)
ra.cancel = make(chan struct{})
actionID := ra.action.ID()
rnr, err := ra.runnerFactory.NewActionRunner(ra.action, ra.cancel)
if cause := errors.Cause(err); charmrunner.IsBadActionError(cause) {
if err := ra.callbacks.FailAction(actionID, err.Error()); err != nil {
return nil, err
}
return nil, ErrSkipExecute
} else if cause == charmrunner.ErrActionNotAvailable {
return nil, ErrSkipExecute
} else if err != nil {
return nil, errors.Annotatef(err, "cannot create runner for action %q", actionID)
}
actionData, err := rnr.Context().ActionData()
if err != nil {
// this should *really* never happen, but let's not panic
return nil, errors.Trace(err)
}
err = rnr.Context().Prepare()
if err != nil {
return nil, errors.Trace(err)
}
ra.name = actionData.Name
ra.runner = rnr
return stateChange{
Kind: RunAction,
Step: Pending,
ActionId: &actionID,
Hook: state.Hook,
}.apply(state), nil
}
// Execute runs the action, and preserves any hook recorded in the supplied state.
// Execute is part of the Operation interface.
func (ra *runAction) Execute(state State) (*State, error) {
message := fmt.Sprintf("running action %s", ra.name)
if err := ra.callbacks.SetExecutingStatus(message); err != nil {
return nil, err
}
done := make(chan struct{})
wait := make(chan struct{})
actionID := ra.action.ID()
go func() {
defer close(wait)
for {
select {
case <-done:
return
case <-ra.changed:
}
status, err := ra.callbacks.ActionStatus(actionID)
if err != nil {
ra.logger.Warningf("unable to get action status for %q: %v", actionID, err)
continue
}
if status == params.ActionAborting {
ra.logger.Infof("action %s aborting", actionID)
close(ra.cancel)
return
}
}
}()
handlerType, err := ra.runner.RunAction(ra.name)
close(done)
<-wait
if err != nil {
// This indicates an actual error -- an action merely failing should
// be handled inside the Runner, and returned as nil.
return nil, errors.Annotatef(err, "action %q (via %s) failed", ra.name, handlerType)
}
return stateChange{
Kind: RunAction,
Step: Done,
ActionId: &actionID,
Hook: state.Hook,
}.apply(state), nil
}
// Commit preserves the recorded hook, and returns a neutral state.
// Commit is part of the Operation interface.
func (ra *runAction) Commit(state State) (*State, error) {
return stateChange{
Kind: continuationKind(state),
Step: Pending,
Hook: state.Hook,
}.apply(state), nil
}
// RemoteStateChanged is called when the remote state changed during execution
// of the operation.
func (ra *runAction) RemoteStateChanged(snapshot remotestate.Snapshot) {
actionID := ra.action.ID()
change, ok := snapshot.ActionChanged[actionID]
if !ok {
ra.logger.Errorf("action %s missing action changed entry", actionID)
// Shouldn't happen.
return
}
if ra.change < change {
ra.change = change
ra.logger.Errorf("running action %s changed", actionID)
select {
case ra.changed <- struct{}{}:
default:
}
}
}
// continuationKind determines what State Kind the operation
// should return after Commit.
func continuationKind(state State) Kind {
switch {
case state.Hook != nil:
return RunHook
default:
return Continue
}
}