Skip to content

Commit

Permalink
Merge pull request kubernetes#98224 from vpnachev/automated-cherry-pi…
Browse files Browse the repository at this point in the history
…ck-of-#96617-upstream-release-1.20

Automated cherry pick of kubernetes#94599: Fixes Attach Detach Controller reconciler race reading kubernetes#96617: Recover CSI volumes from dangling attachments
  • Loading branch information
k8s-ci-robot committed Feb 4, 2021
2 parents 01b9bcd + 85fd25e commit fc3eaf4
Show file tree
Hide file tree
Showing 9 changed files with 660 additions and 144 deletions.
74 changes: 72 additions & 2 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,11 @@ func (adc *attachDetachController) populateActualStateOfWorld() error {
adc.addNodeToDswp(node, types.NodeName(node.Name))
}
}
return nil
err = adc.processVolumeAttachments()
if err != nil {
klog.Errorf("Failed to process volume attachments: %v", err)
}
return err
}

func (adc *attachDetachController) getNodeVolumeDevicePath(
Expand Down Expand Up @@ -461,7 +465,12 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
err)
continue
}
if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) {
attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName)
if attachState == cache.AttachStateAttached {
klog.V(10).Infof("Volume %q is attached to node %q. Marking as attached in ActualStateOfWorld",
volumeName,
nodeName,
)
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
klog.Errorf("Failed to find device path: %v", err)
Expand Down Expand Up @@ -679,6 +688,67 @@ func (adc *attachDetachController) processVolumesInUse(
}
}

// Process Volume-Attachment objects.
// Should be called only after populating attached volumes in the ASW.
// For each VA object, this function checks if its present in the ASW.
// If not, adds the volume to ASW as an "uncertain" attachment.
// In the reconciler, the logic checks if the volume is present in the DSW;
// if yes, the reconciler will attempt attach on the volume;
// if not (could be a dangling attachment), the reconciler will detach this volume.
func (adc *attachDetachController) processVolumeAttachments() error {
vas, err := adc.volumeAttachmentLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list VolumeAttachment objects: %v", err)
return err
}
for _, va := range vas {
nodeName := types.NodeName(va.Spec.NodeName)
pvName := va.Spec.Source.PersistentVolumeName
if pvName == nil {
// Currently VA objects are created for CSI volumes only. nil pvName is unexpected, generate a warning
klog.Warningf("Skipping the va as its pvName is nil, va.Name: %q, nodeName: %q",
va.Name, nodeName)
continue
}
pv, err := adc.pvLister.Get(*pvName)
if err != nil {
klog.Errorf("Unable to lookup pv object for: %q, err: %v", *pvName, err)
continue
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
// Currently VA objects are created for CSI volumes only. nil plugin is unexpected, generate a warning
klog.Warningf(
"Skipping processing the volume %q on nodeName: %q, no attacher interface found. err=%v",
*pvName,
nodeName,
err)
continue
}
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
klog.Errorf(
"Failed to find unique name for volume:%q, va.Name:%q, nodeName:%q: %v",
*pvName,
va.Name,
nodeName,
err)
continue
}
attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName)
if attachState == cache.AttachStateDetached {
klog.V(1).Infof("Marking volume attachment as uncertain as volume:%q (%q) is not attached (%v)",
volumeName, nodeName, attachState)
err = adc.actualStateOfWorld.MarkVolumeAsUncertain(volumeName, volumeSpec, nodeName)
if err != nil {
klog.Errorf("MarkVolumeAsUncertain fail to add the volume %q (%q) to ASW. err: %s", volumeName, nodeName, err)
}
}
}
return nil
}

var _ volume.VolumeHost = &attachDetachController{}
var _ volume.AttachDetachVolumeHost = &attachDetachController{}

Expand Down
234 changes: 222 additions & 12 deletions pkg/controller/volume/attachdetach/attach_detach_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,21 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
nodeInformer := informerFactory.Core().V1().Nodes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
volumeAttachmentInformer := informerFactory.Storage().V1().VolumeAttachments()

adc := &attachDetachController{
kubeClient: fakeKubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: nil,
kubeClient: fakeKubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
volumeAttachmentLister: volumeAttachmentInformer.Lister(),
volumeAttachmentSynced: volumeAttachmentInformer.Informer().HasSynced,
cloud: nil,
}

// Act
Expand Down Expand Up @@ -116,8 +119,8 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
found := adc.actualStateOfWorld.IsVolumeAttachedToNode(attachedVolume.Name, nodeName)
if !found {
attachedState := adc.actualStateOfWorld.GetAttachState(attachedVolume.Name, nodeName)
if attachedState != cache.AttachStateAttached {
t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
}
}
Expand Down Expand Up @@ -335,3 +338,210 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
}

}

type vaTest struct {
testName string
volName string
podName string
podNodeName string
pvName string
vaName string
vaNodeName string
vaAttachStatus bool
expected_attaches map[string][]string
expected_detaches map[string][]string
}

func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
for _, tc := range []vaTest{
{ // pod is scheduled
testName: "Scheduled pod",
volName: "vol1",
podName: "pod1",
podNodeName: "mynode-1",
pvName: "pv1",
vaName: "va1",
vaNodeName: "mynode-1",
vaAttachStatus: false,
expected_attaches: map[string][]string{"mynode-1": {"vol1"}},
expected_detaches: map[string][]string{},
},
{ // pod is deleted, attach status:true, verify dangling volume is detached
testName: "VA status is attached",
volName: "vol1",
pvName: "pv1",
vaName: "va1",
vaNodeName: "mynode-1",
vaAttachStatus: true,
expected_attaches: map[string][]string{},
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
},
{ // pod is deleted, attach status:false, verify dangling volume is detached
testName: "VA status is unattached",
volName: "vol1",
pvName: "pv1",
vaName: "va1",
vaNodeName: "mynode-1",
vaAttachStatus: false,
expected_attaches: map[string][]string{},
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
},
} {
t.Run(tc.testName, func(t *testing.T) {
volumeAttachmentRecoveryTestCase(t, tc)
})
}
}

func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
plugins := controllervolumetesting.CreateTestPlugin()
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
podInformer := informerFactory.Core().V1().Pods().Informer()
pvInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer()

// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
plugins,
nil, /* prober */
false,
1*time.Second,
DefaultTimerConfig,
nil, /* filteredDialOptions */
)
if err != nil {
t.Fatalf("NewAttachDetachController failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)

// Add existing objects (created by testplugin) to the respective informers
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
for _, pod := range pods.Items {
podToAdd := pod
podInformer.GetIndexer().Add(&podToAdd)
}
nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
for _, node := range nodes.Items {
nodeToAdd := node
nodeInformer.GetIndexer().Add(&nodeToAdd)
}

// Create and add objects requested by the test
if tc.podName != "" {
newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName)
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
podInformer.GetIndexer().Add(newPod)
}
if tc.pvName != "" {
newPv := controllervolumetesting.NewPV(tc.pvName, tc.volName)
_, err = adc.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), newPv, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pv: <%v>", err)
}
pvInformer.GetIndexer().Add(newPv)
}
if tc.vaName != "" {
newVa := controllervolumetesting.NewVolumeAttachment("va1", "pv1", "mynode-1", false)
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err)
}
vaInformer.GetIndexer().Add(newVa)
}

// Makesure the informer cache is synced
stopCh := make(chan struct{})
informerFactory.Start(stopCh)

if !kcache.WaitForNamedCacheSync("attach detach", stopCh,
informerFactory.Core().V1().Pods().Informer().HasSynced,
informerFactory.Core().V1().Nodes().Informer().HasSynced,
informerFactory.Core().V1().PersistentVolumes().Informer().HasSynced,
informerFactory.Storage().V1().VolumeAttachments().Informer().HasSynced) {
t.Fatalf("Error waiting for the informer caches to sync")
}

// Populate ASW
err = adc.populateActualStateOfWorld()
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}

// Populate DSW
err = adc.populateDesiredStateOfWorld()
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
// Run reconciler and DSW populator loops
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
defer close(stopCh)

// Verify if expected attaches and detaches have happened
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
for tries := 0; tries <= 10; tries++ { // wait & try few times before failing the test
expected_op_map := tc.expected_attaches
plugin_map := testPlugin.GetAttachedVolumes()
verify_op := "attach"
volFound, nodeFound := false, false
for i := 0; i <= 1; i++ { // verify attaches and detaches
if i == 1 {
expected_op_map = tc.expected_detaches
plugin_map = testPlugin.GetDetachedVolumes()
verify_op = "detach"
}
// Verify every (node, volume) in the expected_op_map is in the
// plugin_map
for expectedNode, expectedVolumeList := range expected_op_map {
var volumeList []string
volumeList, nodeFound = plugin_map[expectedNode]
if !nodeFound && tries == 10 {
t.Fatalf("Expected node not found, node:%v, op: %v, tries: %d",
expectedNode, verify_op, tries)
}
for _, expectedVolume := range expectedVolumeList {
volFound = false
for _, volume := range volumeList {
if expectedVolume == volume {
volFound = true
break
}
}
if !volFound && tries == 10 {
t.Fatalf("Expected %v operation not found, node:%v, volume: %v, tries: %d",
verify_op, expectedNode, expectedVolume, tries)
}
}
}
}
if nodeFound && volFound {
break
}
time.Sleep(time.Second * 1)
}

if testPlugin.GetErrorEncountered() {
t.Fatalf("Fatal error encountered in the testing volume plugin")
}

}

0 comments on commit fc3eaf4

Please sign in to comment.