Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #88006: Unregister csiplugin even if socket path is gone #89934: Simplify unregistration of csiplugin #90125

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kubelet/pluginmanager/cache/actual_state_of_world.go
Expand Up @@ -79,6 +79,8 @@ var _ ActualStateOfWorld = &actualStateOfWorld{}
type PluginInfo struct {
SocketPath string
Timestamp time.Time
Handler PluginHandler
Name string
}

func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go
Expand Up @@ -30,6 +30,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo)
Expand Down Expand Up @@ -61,6 +63,8 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
require.EqualError(t, err, "socket path is empty")
Expand All @@ -86,6 +90,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
// Assert
Expand Down Expand Up @@ -116,6 +122,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
Handler: nil,
Name: "test",
}
err := asw.AddPlugin(pluginInfo)
// Assert
Expand Down
Expand Up @@ -49,7 +49,7 @@ type OperationExecutor interface {

// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
}

// NewOperationExecutor returns a new instance of OperationExecutor.
Expand Down Expand Up @@ -105,12 +105,11 @@ func (oe *operationExecutor) RegisterPlugin(
}

func (oe *operationExecutor) UnregisterPlugin(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pluginInfo cache.PluginInfo,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)

return oe.pendingOperations.Run(
socketPath, generatedOperation)
pluginInfo.SocketPath, generatedOperation)
}
Expand Up @@ -69,7 +69,8 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
ch, quit, oe := setup()
for i := 0; i < numPluginsToUnregister; i++ {
socketPath := "socket-path" + strconv.Itoa(i)
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)

}
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
Expand All @@ -81,7 +82,8 @@ func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T)
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToUnregister; i++ {
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)

}
if !isOperationRunSerially(ch, quit) {
Expand Down Expand Up @@ -115,8 +117,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
}

func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
Expand Down
34 changes: 10 additions & 24 deletions pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go
Expand Up @@ -67,8 +67,7 @@ type OperationGenerator interface {

// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
}

Expand Down Expand Up @@ -115,6 +114,8 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
Timestamp: timestamp,
Handler: handler,
Name: infoResp.Name,
})
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
Expand All @@ -133,35 +134,20 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
}

func (og *operationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
pluginInfo cache.PluginInfo,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

unregisterPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("UnregisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
if err != nil {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}

handler, ok := pluginHandlers[infoResp.Type]
if !ok {
return fmt.Errorf("UnregisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
if pluginInfo.Handler == nil {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
}

// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(socketPath)
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)

pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)

handler.DeRegisterPlugin(infoResp.Name)
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler)
return nil
}
return unregisterPluginFunc
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/pluginmanager/reconciler/reconciler.go
Expand Up @@ -127,7 +127,7 @@ func (rc *reconciler) reconcile() {

if unregisterPlugin {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.actualStateOfWorld)
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/pluginmanager/reconciler/reconciler_test.go
Expand Up @@ -252,6 +252,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
}

dsw.RemovePlugin(socketPath)
os.Remove(socketPath)
waitForUnregistration(t, socketPath, asw)

// Get asw plugins; it should no longer contain the added plugin
Expand Down