Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the unregistration of csiplugin #89934

Merged
merged 1 commit into from Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 2 additions & 12 deletions pkg/kubelet/pluginmanager/cache/actual_state_of_world.go
Expand Up @@ -75,22 +75,12 @@ type actualStateOfWorld struct {

var _ ActualStateOfWorld = &actualStateOfWorld{}

// NamedPluginHandler holds information for handler and the name of the plugin
type NamedPluginHandler struct {
Handler PluginHandler
Name string
}

// SocketPluginHandlers contains the map from socket path to NamedPluginHandler
type SocketPluginHandlers struct {
Handlers map[string]NamedPluginHandler
sync.Mutex
}

// PluginInfo holds information of a plugin
type PluginInfo struct {
SocketPath string
Timestamp time.Time
Handler PluginHandler
Name string
}

func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go
Expand Up @@ -30,6 +30,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo)
Expand Down Expand Up @@ -61,6 +63,8 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
require.EqualError(t, err, "socket path is empty")
Expand All @@ -86,6 +90,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
// Assert
Expand Down Expand Up @@ -116,6 +122,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
// Assert
Expand Down
Expand Up @@ -45,11 +45,11 @@ import (
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error

// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
}

// NewOperationExecutor returns a new instance of OperationExecutor.
Expand Down Expand Up @@ -96,23 +96,20 @@ func (oe *operationExecutor) RegisterPlugin(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
saad-ali marked this conversation as resolved.
Show resolved Hide resolved
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)

return oe.pendingOperations.Run(
socketPath, generatedOperation)
}

func (oe *operationExecutor) UnregisterPlugin(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
pluginInfo cache.PluginInfo,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)

return oe.pendingOperations.Run(
socketPath, generatedOperation)
pluginInfo.SocketPath, generatedOperation)
}
Expand Up @@ -44,10 +44,9 @@ func init() {

func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
t.Fatalf("Unable to start register operations in Concurrent for plugins")
Expand All @@ -57,9 +56,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToRegister; i++ {
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)

}
if !isOperationRunSerially(ch, quit) {
Expand All @@ -69,10 +67,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {

func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToUnregister; i++ {
socketPath := "socket-path" + strconv.Itoa(i)
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)

}
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
Expand All @@ -83,9 +81,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
hdlr := cache.SocketPluginHandlers{}
for i := 0; i < numPluginsToUnregister; i++ {
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)

}
if !isOperationRunSerially(ch, quit) {
Expand All @@ -109,7 +107,6 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

opFunc := func() error {
Expand All @@ -120,9 +117,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
}

func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
Expand Down
43 changes: 9 additions & 34 deletions pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go
Expand Up @@ -63,22 +63,18 @@ type OperationGenerator interface {
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error

// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
}

func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

registerPluginFunc := func() error {
Expand Down Expand Up @@ -118,19 +114,15 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
Timestamp: timestamp,
Handler: handler,
Name: infoResp.Name,
})
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
pathToHandlers.Lock()
if pathToHandlers.Handlers == nil {
pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler)
}
pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name}
pathToHandlers.Unlock()

// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := og.notifyPlugin(client, true, ""); err != nil {
Expand All @@ -142,37 +134,20 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
}

func (og *operationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pathToHandlers *cache.SocketPluginHandlers,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

unregisterPluginFunc := func() error {
_, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
klog.V(4).Infof("unable to dial: %v", err)
} else {
conn.Close()
}

var handlerWithName cache.NamedPluginHandler
pathToHandlers.Lock()
handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath]
pathToHandlers.Unlock()

if !handlerFound {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath)
if pluginInfo.Handler == nil {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
}
// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(socketPath)
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)

handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name)
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)

pathToHandlers.Lock()
delete(pathToHandlers.Handlers, socketPath)
pathToHandlers.Unlock()
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler)
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler)
return nil
}
return unregisterPluginFunc
Expand Down
13 changes: 2 additions & 11 deletions pkg/kubelet/pluginmanager/reconciler/reconciler.go
Expand Up @@ -67,7 +67,6 @@ func NewReconciler(
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
handlers: make(map[string]cache.PluginHandler),
pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)},
}
}

Expand All @@ -77,7 +76,6 @@ type reconciler struct {
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
handlers map[string]cache.PluginHandler
pathToHandlers cache.SocketPluginHandlers
sync.RWMutex
}

Expand Down Expand Up @@ -105,13 +103,6 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
return rc.handlers
}

func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers {
rc.RLock()
defer rc.RUnlock()

return &rc.pathToHandlers
}

func (rc *reconciler) reconcile() {
// Unregisterations are triggered before registrations

Expand All @@ -136,7 +127,7 @@ func (rc *reconciler) reconcile() {

if unregisterPlugin {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
Expand All @@ -154,7 +145,7 @@ func (rc *reconciler) reconcile() {
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
Expand Down