Skip to content

Commit

Permalink
use node informer to check volumes attachment status before backoff
Browse files Browse the repository at this point in the history
fix unit tests
  • Loading branch information
gnufied committed Jan 6, 2022
1 parent f4ba875 commit f61c4b1
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 19 deletions.
Expand Up @@ -858,6 +858,10 @@ func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error)
return v1.ResourceList{}, nil
}

func (adc *attachDetachController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}

func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/volume/expand/expand_controller.go
Expand Up @@ -456,6 +456,10 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*
}
}

func (expc *expandController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}

func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/volume/persistentvolume/volume_host.go
Expand Up @@ -95,6 +95,10 @@ func (ctrl *PersistentVolumeController) GetNodeAllocatable() (v1.ResourceList, e
return v1.ResourceList{}, nil
}

func (ctrl *PersistentVolumeController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
return map[v1.UniqueVolumeName]string{}, nil
}

func (ctrl *PersistentVolumeController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController")
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -214,6 +214,12 @@ func newTestKubeletWithImageList(
Address: testKubeletHostIP,
},
},
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake/fake-device",
DevicePath: "fake/path",
},
},
},
},
},
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubelet/volume_host.go
Expand Up @@ -270,6 +270,20 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
return node.Labels, nil
}

func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
node, err := kvh.kubelet.GetNode()
if err != nil {
return nil, fmt.Errorf("error retrieving node: %v", err)
}
attachedVolumes := node.Status.VolumesAttached
result := map[v1.UniqueVolumeName]string{}
for i := range attachedVolumes {
attachedVolume := attachedVolumes[i]
result[attachedVolume.Name] = attachedVolume.DevicePath
}
return result, nil
}

func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
return kvh.kubelet.nodeName
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/volumemanager/reconciler/reconciler.go
Expand Up @@ -709,5 +709,5 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {

// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err)
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
}
124 changes: 116 additions & 8 deletions pkg/kubelet/volumemanager/reconciler/reconciler_test.go
Expand Up @@ -189,7 +189,20 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
// Verifies there are no attach/detach calls.
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
Expand Down Expand Up @@ -438,7 +451,20 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Verifies there are no attach/detach calls made.
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
Expand Down Expand Up @@ -660,9 +686,22 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{
PersistentVolume: gcepv,
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}

// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
Expand Down Expand Up @@ -870,8 +909,22 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
PersistentVolume: gcepv,
}

node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}

// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
Expand Down Expand Up @@ -1179,7 +1232,21 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {

// deep copy before reconciler runs to avoid data race.
pvWithSize := pv.DeepCopy()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Expand Down Expand Up @@ -1354,7 +1421,21 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
},
}

volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
fakePlugin.SupportsRemount = tc.supportRemount

dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
Expand Down Expand Up @@ -1564,7 +1645,21 @@ func Test_UncertainVolumeMountState(t *testing.T) {
},
}

volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
},
},
},
}

volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
Expand Down Expand Up @@ -1866,7 +1961,20 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume

func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)

dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
Expand Down
13 changes: 8 additions & 5 deletions pkg/kubelet/volumemanager/volume_manager_test.go
Expand Up @@ -99,7 +99,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)

manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)

stopCh := runVolumeManager(manager)
defer close(stopCh)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {

kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)

manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)

stopCh := runVolumeManager(manager)
defer close(stopCh)
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)

manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)

stopCh := runVolumeManager(manager)
defer close(stopCh)
Expand Down Expand Up @@ -292,12 +292,15 @@ func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UI
return ok
}

func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface) VolumeManager {
func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager {
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
fakeRecorder := &record.FakeRecorder{}
plugMgr := &volume.VolumePluginMgr{}
// TODO (#51147) inject mock prober
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil))
fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)
fakeVolumeHost.WithNode(node)

plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, fakeVolumeHost)
stateProvider := &fakePodStateProvider{}
fakePathHandler := volumetest.NewBlockVolumePathHandler()
vm := NewVolumeManager(
Expand Down
2 changes: 2 additions & 0 deletions pkg/volume/plugins.go
Expand Up @@ -449,6 +449,8 @@ type VolumeHost interface {
// Returns the name of the node
GetNodeName() types.NodeName

GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error)

// Returns the event recorder of kubelet.
GetEventRecorder() record.EventRecorder

Expand Down
13 changes: 13 additions & 0 deletions pkg/volume/testing/testing.go
Expand Up @@ -1655,6 +1655,19 @@ func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumeP
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}

func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(VolumeConfig{})
v := NewFakeKubeletVolumeHost(
t,
"", /* rootDir */
nil, /* kubeClient */
plugins, /* plugins */
)
v.WithNode(node)

return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}

// CreateTestPVC returns a provisionable PVC for tests
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{
Expand Down

0 comments on commit f61c4b1

Please sign in to comment.