Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: remove global vars #74966

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/kubelet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ go_library(
"//pkg/util/taints:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/csi/constants:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/hostutil:go_default_library",
"//pkg/volume/util/subpath:go_default_library",
Expand Down Expand Up @@ -166,6 +167,7 @@ go_test(
name = "go_default_test",
srcs = [
"active_deadline_test.go",
"kubelet_csi_handler_test.go",
"kubelet_getters_test.go",
"kubelet_network_test.go",
"kubelet_node_status_test.go",
Expand Down Expand Up @@ -202,6 +204,8 @@ go_test(
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/pluginmanagerfakes:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
Expand All @@ -221,12 +225,15 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/awsebs:go_default_library",
"//pkg/volume/azure_dd:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/csi/constants:go_default_library",
"//pkg/volume/gcepd:go_default_library",
"//pkg/volume/hostpath:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/hostutil:go_default_library",
"//pkg/volume/util/subpath:go_default_library",
"//pkg/volume/volumefakes:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
32 changes: 30 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ import (
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/subpath"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
Expand Down Expand Up @@ -170,6 +169,10 @@ const (

// Minimum number of dead containers to keep in a pod
minDeadContainerInPod = 1

// csiPluginHandlerRegistrationPollInterval is the interval that is used for
// retrying the registration of the CSI plugin as a volume plugin handler.
csiPluginHandlerRegistrationPollInterval = 10 * time.Second
)

// SyncHandler is an interface implemented by Kubelet, for testability
Expand Down Expand Up @@ -1298,15 +1301,40 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
kl.containerLogManager.Start()

// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
kl.registerCSIPluginHandler(kl.volumePluginMgr.GetCSIKubeletPluginHandler, csiPluginHandlerRegistrationPollInterval)

// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager
klog.V(4).Infof("starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}

func (kl *Kubelet) registerCSIPluginHandler(getHandler func() (plugincache.PluginHandler, error), pollInterval time.Duration) {
tryRegister := func() (bool, error) {
handler, err := getHandler()
if err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Warningf("failed to get CSI Plugin Handler: %v", err)
return false, nil
}

kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, handler)
klog.Info("CSI Plugin Handler successfully registered with the Plugin Manager")
return true, nil
}

// try sync once
if ok, _ := tryRegister(); ok {
return
}

// retry in the background
go wait.PollUntil(pollInterval, tryRegister, wait.NeverStop)
}

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
Expand Down
156 changes: 156 additions & 0 deletions pkg/kubelet/kubelet_csi_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginmanagerfakes"
)

func TestPluginHandlerRegistration(t *testing.T) {
// Setup
testKubelet := newTestKubeletWithImageList(t, nil /*imageList*/, false /*controllerAttachDetachEnabled*/, defaultVolumePlugins())
defer testKubelet.Cleanup()

pluginManager := &pluginmanagerfakes.FakePluginManager{}
kubelet := testKubelet.kubelet
kubelet.pluginManager = pluginManager

// Run
// kubelet.initializeRuntimeDependentModules() calls kubelet.registerCSIPluginHandler()
kubelet.initializeRuntimeDependentModules()

// Assert
addHandlerCallData := pluginManager.Invocations()["AddHandler"]
registeredPlugins := []string{}
for i := 0; i < len(addHandlerCallData); i++ {
// the first arguments to AddHandler is the plugin name
registeredPlugins = append(registeredPlugins, addHandlerCallData[i][0].(string))
}

require.Containsf(t, registeredPlugins, "CSIPlugin", "Expected the CSIPlugin to have been registered as a PluginHandler")
require.Containsf(t, registeredPlugins, "DevicePlugin", "Expected the Devicelugin to have been registered as a PluginHandler")
}

func TestRegisterCSIPluginHandler(t *testing.T) {
tests := map[string]struct {
errors []error
expectedCalls int
expectedEvents []string
}{
"successful on first registration attempt": {
errors: []error{nil},
expectedCalls: 1,
},
"successful on later registration attempt": {
errors: []error{fmt.Errorf("not yet"), fmt.Errorf("still not yet"), nil, fmt.Errorf("we should never hit that")},
expectedCalls: 3,
expectedEvents: []string{
"Warning KubeletSetupFailed not yet",
"Warning KubeletSetupFailed still not yet",
},
},
}

for name, test := range tests {
name, test := name, test

t.Run(name, func(t *testing.T) {
// Setup
done := make(chan error, 1)

k := &Kubelet{}

pluginManager := &pluginmanagerfakes.FakePluginManager{}
k.pluginManager = pluginManager

// we should, worst case, generate as many events as we trigger registration errors
events := make(chan string, len(test.errors))
k.recorder = &record.FakeRecorder{Events: events}

pluginManager.AddHandlerCalls(func(name string, _ cache.PluginHandler) {
var err error
if a, e := name, "CSIPlugin"; e != a {
err = fmt.Errorf("Expected to be called with plugin name %q, got called with %q", e, a)
t.Error(err)
}
done <- err
})

fpg := &fakePluginGetter{done: done, errors: test.errors}

// Run
k.registerCSIPluginHandler(fpg.get, time.Duration(1))

// Assert
require.NoError(t, <-done, "Expected no error to occur")
require.Equal(t, test.expectedCalls, fpg.count, "Expected number of calls to the plugin handler getter")
require.EqualValues(t, test.expectedEvents, getEvents(t, events))
})
}
}

func getEvents(t *testing.T, ch <-chan string) []string {
t.Helper()

var events []string

for {
select {
case event, ok := <-ch:
if ok {
events = append(events, event)
} else {
t.Errorf("Unexpected channel closure detected")
return events
}
default: // nothing ready to read (anymore)
return events
}
}
}

type fakePluginGetter struct {
sync.Mutex
count int
errors []error
done chan error
}

func (g *fakePluginGetter) get() (cache.PluginHandler, error) {
g.Lock()
defer g.Unlock()

if g.count >= len(g.errors) {
err := fmt.Errorf("no more fake returns")
g.done <- err
return nil, err
}

e := g.errors[g.count]

g.count++
return nil, e
}
3 changes: 2 additions & 1 deletion pkg/kubelet/kubelet_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume/csi"
csiconsts "k8s.io/kubernetes/pkg/volume/csi/constants"
)

// getRootDir returns the full path to the directory under which kubelet can
Expand Down Expand Up @@ -315,7 +316,7 @@ func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, err
}
unescapePluginName := utilstrings.UnescapeQualifiedName(volumePluginName)

if unescapePluginName != csi.CSIPluginName {
if unescapePluginName != csiconsts.CSIPluginName {
for _, volumeDir := range volumeDirs {
volumes = append(volumes, filepath.Join(volumePluginPath, volumeDir))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
inputImageList, expectedImageList := generateTestingImageLists(numTestImages, int(tc.nodeStatusMaxImages))
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */, true /*initFakeVolumePlugin*/)
t, inputImageList, false /* controllerAttachDetachEnabled */, fakeVolumePlugins())
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = tc.nodeStatusMaxImages
Expand Down Expand Up @@ -1325,7 +1325,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
// generate one more in inputImageList than we configure the Kubelet to report
inputImageList, _ := generateTestingImageLists(nodeStatusMaxImages+1, nodeStatusMaxImages)
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */, true /* initFakeVolumePlugin */)
t, inputImageList, false /* controllerAttachDetachEnabled */, fakeVolumePlugins())
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = nodeStatusMaxImages
Expand Down
53 changes: 39 additions & 14 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/awsebs"
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/csi"
csiconsts "k8s.io/kubernetes/pkg/volume/csi/constants"
"k8s.io/kubernetes/pkg/volume/gcepd"
_ "k8s.io/kubernetes/pkg/volume/hostpath"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/subpath"
"k8s.io/kubernetes/pkg/volume/volumefakes"
)

func init() {
Expand Down Expand Up @@ -135,14 +138,14 @@ func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubel
Size: 456,
},
}
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, true /*initFakeVolumePlugin*/)
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, fakeVolumePlugins())
}

func newTestKubeletWithImageList(
t *testing.T,
imageList []kubecontainer.Image,
controllerAttachDetachEnabled bool,
initFakeVolumePlugin bool) *TestKubelet {
volumePlugins testPluginList) *TestKubelet {
fakeRuntime := &containertest.FakeRuntime{}
fakeRuntime.RuntimeType = "test"
fakeRuntime.VersionInfo = "1.5.0"
Expand Down Expand Up @@ -302,19 +305,9 @@ func newTestKubeletWithImageList(
// Add this as cleanup predicate pod admitter
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))

allPlugins := []volume.VolumePlugin{}
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
if initFakeVolumePlugin {
allPlugins = append(allPlugins, plug)
} else {
allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gcepd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)
}

var prober volume.DynamicPluginProber // TODO (#51147) inject mock
kubelet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, token.NewManager(kubelet.kubeClient), allPlugins, prober)
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, token.NewManager(kubelet.kubeClient), volumePlugins, prober)
require.NoError(t, err, "Failed to initialize VolumePluginMgr")

kubelet.volumeManager = kubeletvolume.NewVolumeManager(
Expand Down Expand Up @@ -345,7 +338,8 @@ func newTestKubeletWithImageList(

kubelet.AddPodSyncLoopHandler(activeDeadlineHandler)
kubelet.AddPodSyncHandler(activeDeadlineHandler)
return &TestKubelet{kubelet, fakeRuntime, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug}

return &TestKubelet{kubelet, fakeRuntime, fakeKubeClient, fakeMirrorClient, fakeClock, nil, volumePlugins.fakePlugin()}
}

func newTestPods(count int) []*v1.Pod {
Expand All @@ -364,6 +358,37 @@ func newTestPods(count int) []*v1.Pod {
return pods
}

type testPluginList []volume.VolumePlugin

func (pl testPluginList) fakePlugin() *volumetest.FakeVolumePlugin {
for _, p := range pl {
if fp, ok := p.(*volumetest.FakeVolumePlugin); ok {
return fp
}
}

return nil
}

func fakeVolumePlugins() testPluginList {
fakeCSIPlugin := &volumefakes.FakeKubeletWatchableVolumePlugin{}
fakeCSIPlugin.GetPluginNameReturns(csiconsts.CSIPluginName)

return []volume.VolumePlugin{
fakeCSIPlugin,
&volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil},
}
}

func defaultVolumePlugins() testPluginList {
p := csi.ProbeVolumePlugins()
p = append(p, awsebs.ProbeVolumePlugins()...)
p = append(p, gcepd.ProbeVolumePlugins()...)
p = append(p, azure_dd.ProbeVolumePlugins()...)

return p
}

func TestSyncLoopAbort(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
Expand Down