-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
364 lines (313 loc) · 10.3 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package plugin // import "github.com/docker/docker/plugin"
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"reflect"
"regexp"
"sort"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/pkg/authorization"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/ioutils"
v2 "github.com/docker/docker/plugin/v2"
"github.com/docker/docker/registry"
"github.com/moby/pubsub"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
configFileName = "config.json"
rootFSFileName = "rootfs"
)
var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
// Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
type Executor interface {
Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
IsRunning(id string) (bool, error)
Restore(id string, stdout, stderr io.WriteCloser) (alive bool, err error)
Signal(id string, signal syscall.Signal) error
}
// EndpointResolver provides looking up registry endpoints for pulling.
type EndpointResolver interface {
LookupPullEndpoints(hostname string) (endpoints []registry.APIEndpoint, err error)
}
func (pm *Manager) restorePlugin(p *v2.Plugin, c *controller) error {
if p.IsEnabled() {
return pm.restore(p, c)
}
return nil
}
type eventLogger func(id, name string, action events.Action)
// ManagerConfig defines configuration needed to start new manager.
type ManagerConfig struct {
Store *Store // remove
RegistryService EndpointResolver
LiveRestoreEnabled bool // TODO: remove
LogPluginEvent eventLogger
Root string
ExecRoot string
CreateExecutor ExecutorCreator
AuthzMiddleware *authorization.Middleware
}
// ExecutorCreator is used in the manager config to pass in an `Executor`
type ExecutorCreator func(*Manager) (Executor, error)
// Manager controls the plugin subsystem.
type Manager struct {
config ManagerConfig
mu sync.RWMutex // protects cMap
muGC sync.RWMutex // protects blobstore deletions
cMap map[*v2.Plugin]*controller
blobStore content.Store
publisher *pubsub.Publisher
executor Executor
}
// controller represents the manager's control on a plugin.
type controller struct {
restart bool
exitChan chan bool
timeoutInSecs int
}
// NewManager returns a new plugin manager.
func NewManager(config ManagerConfig) (*Manager, error) {
manager := &Manager{
config: config,
}
for _, dirName := range []string{manager.config.Root, manager.config.ExecRoot, manager.tmpDir()} {
if err := os.MkdirAll(dirName, 0o700); err != nil {
return nil, errors.Wrapf(err, "failed to mkdir %v", dirName)
}
}
var err error
manager.executor, err = config.CreateExecutor(manager)
if err != nil {
return nil, err
}
manager.blobStore, err = local.NewStore(filepath.Join(manager.config.Root, "storage"))
if err != nil {
return nil, errors.Wrap(err, "error creating plugin blob store")
}
manager.cMap = make(map[*v2.Plugin]*controller)
if err := manager.reload(); err != nil {
return nil, errors.Wrap(err, "failed to restore plugins")
}
manager.publisher = pubsub.NewPublisher(0, 0)
return manager, nil
}
func (pm *Manager) tmpDir() string {
return filepath.Join(pm.config.Root, "tmp")
}
// HandleExitEvent is called when the executor receives the exit event
// In the future we may change this, but for now all we care about is the exit event.
func (pm *Manager) HandleExitEvent(id string) error {
p, err := pm.config.Store.GetV2Plugin(id)
if err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)); err != nil {
log.G(context.TODO()).WithError(err).WithField("id", id).Error("Could not remove plugin bundle dir")
}
pm.mu.RLock()
c := pm.cMap[p]
if c.exitChan != nil {
close(c.exitChan)
c.exitChan = nil // ignore duplicate events (containerd issue #2299)
}
restart := c.restart
pm.mu.RUnlock()
if restart {
pm.enable(p, c, true)
} else if err := recursiveUnmount(filepath.Join(pm.config.Root, id)); err != nil {
return errors.Wrap(err, "error cleaning up plugin mounts")
}
return nil
}
func handleLoadError(err error, id string) {
if err == nil {
return
}
logger := log.G(context.TODO()).WithError(err).WithField("id", id)
if errors.Is(err, os.ErrNotExist) {
// Likely some error while removing on an older version of docker
logger.Warn("missing plugin config, skipping: this may be caused due to a failed remove and requires manual cleanup.")
return
}
logger.Error("error loading plugin, skipping")
}
func (pm *Manager) reload() error { // todo: restore
dir, err := os.ReadDir(pm.config.Root)
if err != nil {
return errors.Wrapf(err, "failed to read %v", pm.config.Root)
}
plugins := make(map[string]*v2.Plugin)
for _, v := range dir {
if validFullID.MatchString(v.Name()) {
p, err := pm.loadPlugin(v.Name())
if err != nil {
handleLoadError(err, v.Name())
continue
}
plugins[p.GetID()] = p
} else {
if validFullID.MatchString(strings.TrimSuffix(v.Name(), "-removing")) {
// There was likely some error while removing this plugin, let's try to remove again here
if err := containerfs.EnsureRemoveAll(v.Name()); err != nil {
log.G(context.TODO()).WithError(err).WithField("id", v.Name()).Warn("error while attempting to clean up previously removed plugin")
}
}
}
}
pm.config.Store.SetAll(plugins)
var wg sync.WaitGroup
wg.Add(len(plugins))
for _, p := range plugins {
c := &controller{exitChan: make(chan bool)}
pm.mu.Lock()
pm.cMap[p] = c
pm.mu.Unlock()
go func(p *v2.Plugin) {
defer wg.Done()
if err := pm.restorePlugin(p, c); err != nil {
log.G(context.TODO()).WithError(err).WithField("id", p.GetID()).Error("Failed to restore plugin")
return
}
if p.Rootfs != "" {
p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
}
// We should only enable rootfs propagation for certain plugin types that need it.
for _, typ := range p.PluginObj.Config.Interface.Types {
if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver" || typ.Capability == "csinode" || typ.Capability == "csicontroller") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
if p.PluginObj.Config.PropagatedMount != "" {
propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
// check if we need to migrate an older propagated mount from before
// these mounts were stored outside the plugin rootfs
if _, err := os.Stat(propRoot); os.IsNotExist(err) {
rootfsProp := filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
if _, err := os.Stat(rootfsProp); err == nil {
if err := os.Rename(rootfsProp, propRoot); err != nil {
log.G(context.TODO()).WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
}
}
}
if err := os.MkdirAll(propRoot, 0o755); err != nil {
log.G(context.TODO()).Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
}
}
}
}
pm.save(p)
requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p, c, true); err != nil {
log.G(context.TODO()).WithError(err).WithField("id", p.GetID()).Error("failed to enable plugin")
}
}
}(p)
}
wg.Wait()
return nil
}
// Get looks up the requested plugin in the store.
func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
return pm.config.Store.GetV2Plugin(idOrName)
}
func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
p := filepath.Join(pm.config.Root, id, configFileName)
dt, err := os.ReadFile(p)
if err != nil {
return nil, errors.Wrapf(err, "error reading %v", p)
}
var plugin v2.Plugin
if err := json.Unmarshal(dt, &plugin); err != nil {
return nil, errors.Wrapf(err, "error decoding %v", p)
}
return &plugin, nil
}
func (pm *Manager) save(p *v2.Plugin) error {
pluginJSON, err := json.Marshal(p)
if err != nil {
return errors.Wrap(err, "failed to marshal plugin json")
}
if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0o600); err != nil {
return errors.Wrap(err, "failed to write atomically plugin json")
}
return nil
}
// GC cleans up unreferenced blobs. This is recommended to run in a goroutine
func (pm *Manager) GC() {
pm.muGC.Lock()
defer pm.muGC.Unlock()
used := make(map[digest.Digest]struct{})
for _, p := range pm.config.Store.GetAll() {
used[p.Config] = struct{}{}
for _, b := range p.Blobsums {
used[b] = struct{}{}
}
}
ctx := context.TODO()
pm.blobStore.Walk(ctx, func(info content.Info) error {
_, ok := used[info.Digest]
if ok {
return nil
}
return pm.blobStore.Delete(ctx, info.Digest)
})
}
type logHook struct{ id string }
func (logHook) Levels() []log.Level {
return []log.Level{
log.PanicLevel,
log.FatalLevel,
log.ErrorLevel,
log.WarnLevel,
log.InfoLevel,
log.DebugLevel,
log.TraceLevel,
}
}
func (l logHook) Fire(entry *log.Entry) error {
entry.Data = log.Fields{"plugin": l.id}
return nil
}
func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
logger := logrus.New()
logger.Hooks.Add(logHook{id})
return logger.WriterLevel(log.InfoLevel), logger.WriterLevel(log.ErrorLevel)
}
func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
return errors.New("incorrect privileges")
}
return nil
}
func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
if len(arrOne) != len(arrOther) {
return false
}
sort.Sort(arrOne)
sort.Sort(arrOther)
for i := 1; i < arrOne.Len(); i++ {
if !compare(arrOne[i], arrOther[i]) {
return false
}
}
return true
}
func isEqualPrivilege(a, b types.PluginPrivilege) bool {
if a.Name != b.Name {
return false
}
return reflect.DeepEqual(a.Value, b.Value)
}