diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 46c9ada1fc54..f95b815e89cf 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -75,22 +75,12 @@ type actualStateOfWorld struct { var _ ActualStateOfWorld = &actualStateOfWorld{} -// NamedPluginHandler holds information for handler and the name of the plugin -type NamedPluginHandler struct { - Handler PluginHandler - Name string -} - -// SocketPluginHandlers contains the map from socket path to NamedPluginHandler -type SocketPluginHandlers struct { - Handlers map[string]NamedPluginHandler - sync.Mutex -} - // PluginInfo holds information of a plugin type PluginInfo struct { SocketPath string Timestamp time.Time + Handler PluginHandler + Name string } func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go index 15d470b8d1a0..aacd5f002276 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go @@ -30,6 +30,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) { pluginInfo := PluginInfo{ SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", Timestamp: time.Now(), + Handler: nil, + Name: "test", } asw := NewActualStateOfWorld() err := asw.AddPlugin(pluginInfo) @@ -61,6 +63,8 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) { pluginInfo := PluginInfo{ SocketPath: "", Timestamp: time.Now(), + Handler: nil, + Name: "test", } err := asw.AddPlugin(pluginInfo) require.EqualError(t, err, "socket path is empty") @@ -86,6 +90,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) { pluginInfo := PluginInfo{ SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", Timestamp: time.Now(), + Handler: nil, + Name: "test", } err := asw.AddPlugin(pluginInfo) // Assert @@ -116,6 +122,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin pluginInfo := PluginInfo{ SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock", Timestamp: time.Now(), + Handler: nil, + Name: "test", } err := asw.AddPlugin(pluginInfo) // Assert diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go index 603bb2e4abfb..b6e6019a46fa 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go @@ -45,11 +45,11 @@ import ( type OperationExecutor interface { // RegisterPlugin registers the given plugin using the a handler in the plugin handler map. // It then updates the actual state of the world to reflect that. - RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error + RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map. // It then updates the actual state of the world to reflect that. - UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error + UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -96,23 +96,20 @@ func (oe *operationExecutor) RegisterPlugin( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld) + oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld) return oe.pendingOperations.Run( socketPath, generatedOperation) } func (oe *operationExecutor) UnregisterPlugin( - socketPath string, - pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, + pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld) + oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld) return oe.pendingOperations.Run( - socketPath, generatedOperation) + pluginInfo.SocketPath, generatedOperation) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go index 0af0df4fcbd6..b63a56a12116 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go @@ -44,10 +44,9 @@ func init() { func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) { ch, quit, oe := setup() - hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToRegister; i++ { socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) - oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) + oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */) } if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) { t.Fatalf("Unable to start register operations in Concurrent for plugins") @@ -57,9 +56,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { ch, quit, oe := setup() socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) - hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToRegister; i++ { - oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) + oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */) } if !isOperationRunSerially(ch, quit) { @@ -69,10 +67,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) { ch, quit, oe := setup() - hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToUnregister; i++ { socketPath := "socket-path" + strconv.Itoa(i) - oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) + pluginInfo := cache.PluginInfo{SocketPath: socketPath} + oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */) } if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) { @@ -83,9 +81,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) { ch, quit, oe := setup() socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) - hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToUnregister; i++ { - oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) + pluginInfo := cache.PluginInfo{SocketPath: socketPath} + oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */) } if !isOperationRunSerially(ch, quit) { @@ -109,7 +107,6 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { opFunc := func() error { @@ -120,9 +117,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( } func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc( - socketPath string, - pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, + pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { opFunc := func() error { startOperationAndBlock(fopg.ch, fopg.quit) diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 407da39e7067..c1eb242f4524 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -63,14 +63,11 @@ type OperationGenerator interface { socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error // Generates the UnregisterPlugin function needed to perform the unregistration of a plugin GenerateUnregisterPluginFunc( - socketPath string, - pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, + pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error } @@ -78,7 +75,6 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { registerPluginFunc := func() error { @@ -118,6 +114,8 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{ SocketPath: socketPath, Timestamp: timestamp, + Handler: handler, + Name: infoResp.Name, }) if err != nil { klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err) @@ -125,12 +123,6 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } - pathToHandlers.Lock() - if pathToHandlers.Handlers == nil { - pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler) - } - pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name} - pathToHandlers.Unlock() // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate if err := og.notifyPlugin(client, true, ""); err != nil { @@ -142,37 +134,20 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( } func (og *operationGenerator) GenerateUnregisterPluginFunc( - socketPath string, - pluginHandlers map[string]cache.PluginHandler, - pathToHandlers *cache.SocketPluginHandlers, + pluginInfo cache.PluginInfo, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { unregisterPluginFunc := func() error { - _, conn, err := dial(socketPath, dialTimeoutDuration) - if err != nil { - klog.V(4).Infof("unable to dial: %v", err) - } else { - conn.Close() - } - - var handlerWithName cache.NamedPluginHandler - pathToHandlers.Lock() - handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath] - pathToHandlers.Unlock() - - if !handlerFound { - return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath) + if pluginInfo.Handler == nil { + return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath) } // We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle // so that if we receive a register event during Register Plugin, we can process it as a Register call. - actualStateOfWorldUpdater.RemovePlugin(socketPath) + actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath) - handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name) + pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) - pathToHandlers.Lock() - delete(pathToHandlers.Handlers, socketPath) - pathToHandlers.Unlock() - klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler) + klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler) return nil } return unregisterPluginFunc diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler.go b/pkg/kubelet/pluginmanager/reconciler/reconciler.go index d23f4b0c3b63..11d02116be2b 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler.go @@ -67,7 +67,6 @@ func NewReconciler( desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, handlers: make(map[string]cache.PluginHandler), - pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)}, } } @@ -77,7 +76,6 @@ type reconciler struct { desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld handlers map[string]cache.PluginHandler - pathToHandlers cache.SocketPluginHandlers sync.RWMutex } @@ -105,13 +103,6 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler { return rc.handlers } -func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers { - rc.RLock() - defer rc.RUnlock() - - return &rc.pathToHandlers -} - func (rc *reconciler) reconcile() { // Unregisterations are triggered before registrations @@ -136,7 +127,7 @@ func (rc *reconciler) reconcile() { if unregisterPlugin { klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", "")) - err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) + err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { @@ -154,7 +145,7 @@ func (rc *reconciler) reconcile() { for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) { klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", "")) - err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) + err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) {