diff --git a/api/server/router/plugin/plugin.go b/api/server/router/plugin/plugin.go index 22819e27a964c..9e86e90dd201f 100644 --- a/api/server/router/plugin/plugin.go +++ b/api/server/router/plugin/plugin.go @@ -1,6 +1,11 @@ package plugin -import "github.com/docker/docker/api/server/router" +import ( + "context" + + "github.com/docker/docker/api/server/router" + "github.com/docker/docker/component/plugincontroller" +) // pluginRouter is a router to talk with the plugin controller type pluginRouter struct { @@ -9,7 +14,8 @@ type pluginRouter struct { } // NewRouter initializes a new plugin router -func NewRouter(b Backend) router.Router { +func NewRouter() router.Router { + b := plugincontroller.Wait(context.Background()) r := &pluginRouter{ backend: b, } @@ -28,7 +34,7 @@ func (r *pluginRouter) initRoutes() { router.NewGetRoute("/plugins/{name:.*}/json", r.inspectPlugin), router.NewGetRoute("/plugins/privileges", r.getPrivileges), router.NewDeleteRoute("/plugins/{name:.*}", r.removePlugin), - router.NewPostRoute("/plugins/{name:.*}/enable", r.enablePlugin), // PATCH? + router.NewPostRoute("/plugins/{name:.*}/enable", r.enablePlugin), router.NewPostRoute("/plugins/{name:.*}/disable", r.disablePlugin), router.NewPostRoute("/plugins/pull", r.pullPlugin, router.WithCancel), router.NewPostRoute("/plugins/{name:.*}/push", r.pushPlugin, router.WithCancel), diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index ad937ede7c159..11ebcecc3a8ee 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -253,7 +253,6 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { Root: cli.Config.Root, Name: name, Backend: d, - PluginBackend: d.PluginManager(), NetworkSubnetsProvider: d, DefaultAdvertiseAddr: cli.Config.SwarmDefaultAdvertiseAddr, RuntimeRoot: cli.getSwarmRunRoot(), @@ -518,7 +517,7 @@ func initRouter(opts routerOptions) { build.NewRouter(opts.buildBackend, opts.daemon), sessionrouter.NewRouter(opts.sessionManager), swarmrouter.NewRouter(opts.cluster), - pluginrouter.NewRouter(opts.daemon.PluginManager()), + pluginrouter.NewRouter(), distributionrouter.NewRouter(opts.daemon), } diff --git a/component/containerstore/component.go b/component/containerstore/component.go new file mode 100644 index 0000000000000..3f27ec28de47e --- /dev/null +++ b/component/containerstore/component.go @@ -0,0 +1,24 @@ +package containerstore + +import ( + "github.com/docker/docker/component" + "github.com/docker/docker/container" + "golang.org/x/net/context" +) + +const name = "containerstore" + +func Set(s container.Store) (cancel func(), err error) { + return component.Register(name, s) +} + +func Get(ctx context.Context) (container.Store, error) { + c := component.Wait(ctx, name) + if c == nil { + return nil, ctx.Err() + } + + // This could panic... but I think this is ok. + // This should never be anything else + return c.(container.Store), nil +} \ No newline at end of file diff --git a/component/plugincontroller/component.go b/component/plugincontroller/component.go new file mode 100644 index 0000000000000..4d9bbb78da281 --- /dev/null +++ b/component/plugincontroller/component.go @@ -0,0 +1,34 @@ +package plugincontroller + +import ( + "github.com/docker/docker/component" + "github.com/docker/docker/plugin" + "golang.org/x/net/context" +) + +const name = "plugincontroller" + +// Set sets the plugin controller component +func Set(s *plugin.Manager) (cancel func(), err error) { + return component.Register(name, s) +} + +// Wait waits for the plugin controller component to be available +func Wait(ctx context.Context) *plugin.Manager { + c := component.Wait(ctx, name) + if c == nil { + return nil + } + + // This could panic... but I think this is ok. + // This should never be anything else + return c.(*plugin.Manager) +} + +func Get() *plugin.Manager { + c := component.Get(name) + if c == nil { + return nil + } + return c.(*plugin.Manager) +} diff --git a/component/store.go b/component/store.go new file mode 100644 index 0000000000000..6fbe14349f207 --- /dev/null +++ b/component/store.go @@ -0,0 +1,137 @@ +package component + +import ( + "sync" + + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +var ( + existsErr = errors.New("component exists") + nilComponentErr = errors.New("cannot store nil component") + + defaultStore = NewStore() +) + +// Component is the type stored in the component store +type Component interface{} + +// NewStore creates a new +func NewStore() *Store { + return &Store{ + components: make(map[string]Component), + waiters: make(map[string]map[chan Component]struct{}), + } +} + +// Store stores components. +type Store struct { + mu sync.Mutex + components map[string]Component + waiters map[string]map[chan Component]struct{} +} + +// Register registers a component with the default store +func Register(name string, c Component) (cancel func(), err error) { + return defaultStore.Register(name, c) +} + +// Get looks up the passed in component name in the default store. +// Get returns nil if the component does not exist. +func Get(name string) Component { + return defaultStore.Get(name) +} + +// Wait waits for a component with the given name in the default store +func Wait(ctx context.Context, name string) Component { + return defaultStore.Wait(ctx, name) +} + +// Register registers a component with the given name in the store. +func (s Store) Register(name string, c Component) (cancel func(), err error) { + if c == nil { + return nil, errors.Wrap(nilComponentErr, name) + } + + s.mu.Lock() + defer s.mu.Unlock() + if _, exists := s.components[name]; exists { + return nil, errors.Wrap(existsErr, name) + } + + s.components[name] = c + s.notifyWaiters(name, c) + + return func() { + s.mu.Lock() + delete(s.components, name) + s.mu.Unlock() + }, nil +} + +func (s Store) notifyWaiters(name string, c Component) { + for waiter := range s.waiters[name] { + waiter <- c + } +} + +func (s Store) unregister(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.components, name) +} + +// Get gets a component from the store. +// If the component does not exist it returns nil. +func (s Store) Get(name string) Component { + s.mu.Lock() + defer s.mu.Unlock() + return s.components[name] +} + +func (s Store) addWaiter(name string, ch chan Component) { + idx, exists := s.waiters[name] + if !exists { + idx = make(map[chan Component]struct{}) + s.waiters[name] = idx + + } + idx[ch] = struct{}{} +} + +func (s Store) removeWaiter(name string, ch chan Component) { + delete(s.waiters[name], ch) + if len(s.waiters) == 0 { + delete(s.waiters, name) + } +} + +// Wait waits for a component with the given name to be added to the store. +// If the component already exists it is returned immediately. +// Wait only returns nil if the passed in context is cancelled. +func (s Store) Wait(ctx context.Context, name string) Component { + s.mu.Lock() + if c := s.components[name]; c != nil { + s.mu.Unlock() + return c + } + + wait := make(chan Component, 1) + + s.addWaiter(name, wait) + defer func() { + s.mu.Lock() + s.removeWaiter(name, wait) + s.mu.Unlock() + }() + + s.mu.Unlock() + + select { + case <-ctx.Done(): + return nil + case c := <-wait: + return c + } +} diff --git a/component/store_test.go b/component/store_test.go new file mode 100644 index 0000000000000..bf0909fd25e1c --- /dev/null +++ b/component/store_test.go @@ -0,0 +1,127 @@ +package component + +import ( + "testing" + "time" + + "context" + + "github.com/pkg/errors" +) + +type testComponent struct{} + +func TestStoreRegister(t *testing.T) { + s := NewStore() + + c := testComponent{} + cancel, err := s.Register("test", c) + if err != nil { + t.Fatal(err) + } + + _, err = s.Register("test", c) + if errors.Cause(err) != existsErr { + t.Fatal(err) + } + + cancel() + cancel, err = s.Register("test", c) + if err != nil { + t.Fatal(err) + } + cancel() + + if _, err := s.Register("niltest", nil); errors.Cause(err) != nilComponentErr { + t.Fatal(err) + } +} + +func TestStoreGet(t *testing.T) { + s := NewStore() + + var c testComponent + cancel, err := s.Register("test", c) + if err != nil { + t.Fatal(err) + } + + service := s.Get("test") + if service == nil { + t.Fatal("expected non-nil service") + } + + if service != c { + t.Fatal("got wrong service after get") + } + + cancel() + service = s.Get("test") + if service != nil { + t.Fatal("expected nil service") + } +} + +func TestStoreWait(t *testing.T) { + s := NewStore() + + ch := make(chan interface{}) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + ch <- s.Wait(ctx, "test") + }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // make sure the wait is in place + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + default: + } + + s.mu.Lock() + ready := len(s.waiters["test"]) > 0 + s.mu.Unlock() + if ready { + break + } + time.Sleep(10 * time.Millisecond) + } + + // nothing added yet, so there shouldn't be anything in this channel + select { + case <-ch: + t.Fatal("wait returned unexpectedly") + default: + } + + var c testComponent + _, err := s.Register("test", c) + if err != nil { + t.Fatal(err) + } + + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case s := <-ch: + if s != c { + t.Fatalf("got unexpected service: %v", s) + } + } + + if len(s.waiters["test"]) != 0 { + t.Fatalf("unpexected waiters: %d", len(s.waiters)) + } +} + +func TestComponentTransoform(t *testing.T) { + +} diff --git a/component/volumestore/component.go b/component/volumestore/component.go new file mode 100644 index 0000000000000..db148886caac0 --- /dev/null +++ b/component/volumestore/component.go @@ -0,0 +1,24 @@ +package volumestore + +import ( + "github.com/docker/docker/component" + "github.com/docker/docker/volume/store" + "golang.org/x/net/context" +) + +const name = "volumestore" + +func Set(s *store.VolumeStore) (cancel func(), err error) { + return component.Register(name, s) +} + +func Get(ctx context.Context) (*store.VolumeStore, error) { + c := component.Wait(ctx, name) + if c == nil { + return nil, ctx.Err() + } + + // This could panic... but I think this is ok. + // This should never be anything else + return c.(*store.VolumeStore), nil +} diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 57fc4d2d6f611..6874dbf0eecd6 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -49,7 +49,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types/network" types "github.com/docker/docker/api/types/swarm" - "github.com/docker/docker/daemon/cluster/controllers/plugin" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/docker/pkg/signal" lncluster "github.com/docker/libnetwork/cluster" @@ -98,7 +97,6 @@ type Config struct { Root string Name string Backend executorpkg.Backend - PluginBackend plugin.Backend NetworkSubnetsProvider NetworkSubnetsProvider // DefaultAdvertiseAddr is the default host/IP or network interface to use diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index fbe9006561730..8dc3f994a5644 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -59,6 +59,5 @@ type Backend interface { WaitForDetachment(context.Context, string, string, string, string) error GetRepository(context.Context, reference.Named, *types.AuthConfig) (distribution.Repository, bool, error) LookupImage(name string) (*types.ImageInspect, error) - PluginManager() *plugin.Manager PluginGetter() *plugin.Store } diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go index a71a9412e3c03..c6d371999a70f 100644 --- a/daemon/cluster/executor/container/executor.go +++ b/daemon/cluster/executor/container/executor.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" swarmtypes "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/component/plugincontroller" "github.com/docker/docker/daemon/cluster/controllers/plugin" executorpkg "github.com/docker/docker/daemon/cluster/executor" clustertypes "github.com/docker/docker/daemon/cluster/provider" @@ -28,10 +29,10 @@ type executor struct { } // NewExecutor returns an executor from the docker client. -func NewExecutor(b executorpkg.Backend, p plugin.Backend) exec.Executor { +func NewExecutor(b executorpkg.Backend) exec.Executor { return &executor{ backend: b, - pluginBackend: p, + pluginBackend: plugincontroller.Wait(context.Background()), dependencies: agent.NewDependencyManager(), } } @@ -62,7 +63,8 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) { addPlugins("Log", info.Plugins.Log) // add v2 plugins - v2Plugins, err := e.backend.PluginManager().List(filters.NewArgs()) + pm := plugincontroller.Wait(ctx) + v2Plugins, err := pm.List(filters.NewArgs()) if err == nil { for _, plgn := range v2Plugins { for _, typ := range plgn.Config.Interface.Types { diff --git a/daemon/cluster/noderunner.go b/daemon/cluster/noderunner.go index b970e7b217954..0affb81b4a65d 100644 --- a/daemon/cluster/noderunner.go +++ b/daemon/cluster/noderunner.go @@ -118,7 +118,7 @@ func (n *nodeRunner) start(conf nodeStartConfig) error { JoinAddr: joinAddr, StateDir: n.cluster.root, JoinToken: conf.joinToken, - Executor: container.NewExecutor(n.cluster.config.Backend, n.cluster.config.PluginBackend), + Executor: container.NewExecutor(n.cluster.config.Backend), HeartbeatTick: 1, ElectionTick: 3, UnlockKey: conf.lockKey, diff --git a/daemon/daemon.go b/daemon/daemon.go index 8359ef31ca18d..416198d0ee1c6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -30,6 +30,8 @@ import ( "github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/logger" // register graph drivers + + "github.com/docker/docker/component/plugincontroller" _ "github.com/docker/docker/daemon/graphdriver/register" "github.com/docker/docker/daemon/initlayer" "github.com/docker/docker/daemon/stats" @@ -103,7 +105,6 @@ type Daemon struct { idMappings *idtools.IDMappings stores map[string]daemonStore // By container target platform PluginStore *plugin.Store // todo: remove - pluginManager *plugin.Manager nameIndex *registrar.Registrar linkIndex *linkIndex containerd libcontainerd.Client @@ -637,7 +638,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe registerMetricsPluginCallback(d.PluginStore, metricsSockPath) // Plugin system initialization should happen before restore. Do not change order. - d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{ + pluginManager, err := plugin.NewManager(plugin.ManagerConfig{ Root: filepath.Join(config.Root, "plugins"), ExecRoot: getPluginExecRoot(config.Root), Store: d.PluginStore, @@ -650,6 +651,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe if err != nil { return nil, errors.Wrap(err, "couldn't create plugin manager") } + plugincontroller.Set(pluginManager) var graphDrivers []string for platform, ds := range d.stores { @@ -929,7 +931,9 @@ func (daemon *Daemon) Shutdown() error { daemon.cleanupMetricsPlugins() // Shutdown plugins after containers and layerstore. Don't change the order. - daemon.pluginShutdown() + if pc := plugincontroller.Get(); pc != nil { + pc.Shutdown() + } // trigger libnetwork Stop only if it's initialized if daemon.netController != nil { @@ -1159,20 +1163,6 @@ func (daemon *Daemon) SetCluster(cluster Cluster) { daemon.cluster = cluster } -func (daemon *Daemon) pluginShutdown() { - manager := daemon.pluginManager - // Check for a valid manager object. In error conditions, daemon init can fail - // and shutdown called, before plugin manager is initialized. - if manager != nil { - manager.Shutdown() - } -} - -// PluginManager returns current pluginManager associated with the daemon -func (daemon *Daemon) PluginManager() *plugin.Manager { // set up before daemon to avoid this method - return daemon.pluginManager -} - // PluginGetter returns current pluginStore associated with the daemon func (daemon *Daemon) PluginGetter() *plugin.Store { return daemon.PluginStore