-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
260 lines (222 loc) · 7.16 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package plugin // import "github.com/docker/docker/daemon/cluster/controllers/plugin"
import (
"context"
"io"
"io/ioutil"
"net/http"
"github.com/docker/distribution/reference"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/plugin"
v2 "github.com/docker/docker/plugin/v2"
"github.com/docker/swarmkit/api"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Controller is the controller for the plugin backend.
// Plugins are managed as a singleton object with a desired state (different from containers).
// With the plugin controller instead of having a strict create->start->stop->remove
// task lifecycle like containers, we manage the desired state of the plugin and let
// the plugin manager do what it already does and monitor the plugin.
// We'll also end up with many tasks all pointing to the same plugin ID.
//
// TODO(@cpuguy83): registry auth is intentionally not supported until we work out
// the right way to pass registry credentials via secrets.
type Controller struct {
backend Backend
spec runtime.PluginSpec
logger *logrus.Entry
pluginID string
serviceID string
// hook used to signal tests that `Wait()` is actually ready and waiting
signalWaitReady func()
}
// Backend is the interface for interacting with the plugin manager
// Controller actions are passed to the configured backend to do the real work.
type Backend interface {
Disable(name string, config *enginetypes.PluginDisableConfig) error
Enable(name string, config *enginetypes.PluginEnableConfig) error
Remove(name string, config *enginetypes.PluginRmConfig) error
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
Get(name string) (*v2.Plugin, error)
SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
}
// NewController returns a new cluster plugin controller
func NewController(backend Backend, t *api.Task) (*Controller, error) {
spec, err := readSpec(t)
if err != nil {
return nil, err
}
return &Controller{
backend: backend,
spec: spec,
serviceID: t.ServiceID,
logger: logrus.WithFields(logrus.Fields{
"controller": "plugin",
"task": t.ID,
"plugin": spec.Name,
})}, nil
}
func readSpec(t *api.Task) (runtime.PluginSpec, error) {
var cfg runtime.PluginSpec
generic := t.Spec.GetGeneric()
if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
return cfg, errors.Wrap(err, "error reading plugin spec")
}
return cfg, nil
}
// Update is the update phase from swarmkit
func (p *Controller) Update(ctx context.Context, t *api.Task) error {
p.logger.Debug("Update")
return nil
}
// Prepare is the prepare phase from swarmkit
func (p *Controller) Prepare(ctx context.Context) (err error) {
p.logger.Debug("Prepare")
remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
if err != nil {
return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
}
if p.spec.Name == "" {
p.spec.Name = remote.String()
}
var authConfig enginetypes.AuthConfig
privs := convertPrivileges(p.spec.Privileges)
pl, err := p.backend.Get(p.spec.Name)
defer func() {
if pl != nil && err == nil {
pl.Acquire()
}
}()
if err == nil && pl != nil {
if pl.SwarmServiceID != p.serviceID {
return errors.Errorf("plugin already exists: %s", p.spec.Name)
}
if pl.IsEnabled() {
if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
}
}
p.pluginID = pl.GetID()
return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
}
if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID), plugin.WithEnv(p.spec.Env)); err != nil {
return err
}
pl, err = p.backend.Get(p.spec.Name)
if err != nil {
return err
}
p.pluginID = pl.GetID()
return nil
}
// Start is the start phase from swarmkit
func (p *Controller) Start(ctx context.Context) error {
p.logger.Debug("Start")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
if p.spec.Disabled {
if pl.IsEnabled() {
return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
}
return nil
}
if !pl.IsEnabled() {
return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
}
return nil
}
// Wait causes the task to wait until returned
func (p *Controller) Wait(ctx context.Context) error {
p.logger.Debug("Wait")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
defer cancel()
if p.signalWaitReady != nil {
p.signalWaitReady()
}
if !p.spec.Disabled != pl.IsEnabled() {
return errors.New("mismatched plugin state")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case e := <-events:
p.logger.Debugf("got event %T", e)
switch e.(type) {
case plugin.EventEnable:
if p.spec.Disabled {
return errors.New("plugin enabled")
}
case plugin.EventRemove:
return errors.New("plugin removed")
case plugin.EventDisable:
if !p.spec.Disabled {
return errors.New("plugin disabled")
}
}
}
}
}
func isNotFound(err error) bool {
return errdefs.IsNotFound(err)
}
// Shutdown is the shutdown phase from swarmkit
func (p *Controller) Shutdown(ctx context.Context) error {
p.logger.Debug("Shutdown")
return nil
}
// Terminate is the terminate phase from swarmkit
func (p *Controller) Terminate(ctx context.Context) error {
p.logger.Debug("Terminate")
return nil
}
// Remove is the remove phase from swarmkit
func (p *Controller) Remove(ctx context.Context) error {
p.logger.Debug("Remove")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
if isNotFound(err) {
return nil
}
return err
}
pl.Release()
if pl.GetRefCount() > 0 {
p.logger.Debug("skipping remove due to ref count")
return nil
}
// This may error because we have exactly 1 plugin, but potentially multiple
// tasks which are calling remove.
err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
if isNotFound(err) {
return nil
}
return err
}
// Close is the close phase from swarmkit
func (p *Controller) Close() error {
p.logger.Debug("Close")
return nil
}
func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
var out enginetypes.PluginPrivileges
for _, p := range ls {
pp := enginetypes.PluginPrivilege{
Name: p.Name,
Description: p.Description,
Value: p.Value,
}
out = append(out, pp)
}
return out
}