Skip to content

Commit

Permalink
Merge pull request kubernetes#106052 from jingxu97/automated-cherry-p…
Browse files Browse the repository at this point in the history
…ick-of-#104526-upstream-release-1.22

Automated cherry pick of kubernetes#104526: Fix issue in node status updating VolumeAttached list
  • Loading branch information
k8s-ci-robot committed Nov 12, 2021
2 parents 7a274d6 + b791721 commit bfdfba3
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"k8s.io/klog/v2"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
Expand Down
25 changes: 17 additions & 8 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ func (rc *reconciler) reconcile() {
// The operation key format is different depending on whether the volume
// allows multi attach across different nodes.
if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName, operationexecutor.DetachOperationName) {
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
} else {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */, operationexecutor.DetachOperationName) {
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
}
Expand Down Expand Up @@ -193,6 +193,8 @@ func (rc *reconciler) reconcile() {

// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
// If volume detach operation fails, the volume needs to be added back to report as attached so that node status
// has the correct volume attachment information.
err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v",
Expand Down Expand Up @@ -222,10 +224,17 @@ func (rc *reconciler) reconcile() {
klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
}
}
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
if err != nil {
// Add volume back to ReportAsAttached if DetachVolume call failed so that node status updater will add it back to VolumeAttached list.
// This function is also called during executing the volume detach operation in operation_generoator.
// It is needed here too because DetachVolume call might fail before executing the actual operation in operation_executor (e.g., cannot find volume plugin etc.)
rc.actualStateOfWorld.AddVolumeToReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)

if !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
}
}
}
}
Expand Down
117 changes: 115 additions & 2 deletions pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -36,7 +36,7 @@ import (
)

const (
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
reconcilerLoopPeriod time.Duration = 10 * time.Millisecond
syncLoopPeriod time.Duration = 100 * time.Minute
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
)
Expand Down Expand Up @@ -599,6 +599,103 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing

}

func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
podName1 := "pod-uid1"
podName2 := "pod-uid2"
podName3 := "pod-uid3"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
nodeName1 := k8stypes.NodeName(volumetesting.FailDetachNode)
nodeName2 := k8stypes.NodeName("node-name2")
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)

// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)

// Add the pod in which the volume is attached to the FailDetachNode
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}

time.Sleep(1000 * time.Millisecond)
// Volume is added to asw, volume should be reported as attached to the node.
waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)

// Delete the pod, but detach will fail
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)

// The first detach will be triggered after at leaset 50ms (maxWaitForUnmountDuration in test).
// Right before detach operation is performed, the volume will be first removed from being reported
// as attached on node status (RemoveVolumeFromReportAsAttached). After detach operation which is expected to fail,
// controller then added the volume back as attached.
// Here it sleeps 100ms so that detach should be triggered already at this point.
// verifyVolumeReportedAsAttachedToNode will check volume is in the list of volume attached that needs to be updated
// in node status. By calling this function (GetVolumesToReportAttached), node status should be updated, and the volume
// will not need to be updated until new changes are applied (detach is triggered again)
time.Sleep(100 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)

// After the first detach fails, reconciler will wait for a period of time before retrying to detach.
// The wait time is increasing exponentially from initial value of 0.5s (0.5, 1, 2, 4, ...).
// The test here waits for 100 Millisecond to make sure it is in exponential backoff period after
// the first detach operation. At this point, volumes status should not be updated
time.Sleep(100 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)

// Wait for 600ms to make sure second detach operation triggered. Again, The volume will be
// removed from being reported as attached on node status and then added back as attached.
// The volume will be in the list of attached volumes that need to be updated to node status.
time.Sleep(600 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)

// Add a second pod which tries to attach the volume to the same node.
// After adding pod to the same node, detach will not be triggered any more.
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Sleep 1s to verify no detach are triggered after second pod is added in the future.
time.Sleep(1000 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)

// Add a third pod which tries to attach the volume to a different node.
// At this point, volume is still attached to first node. There are no status update for both nodes.
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName3), controllervolumetesting.NewPod(podName3, podName3), volumeSpec, nodeName2)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName2, asw)
}

// Creates a volume with accessMode ReadWriteOnce
// First create a pod which will try to attach the volume to the a node named "timeout-node". The attach call for this node will
// fail for timeout, but the volume will be actually attached to the node after the call.
Expand Down Expand Up @@ -1181,6 +1278,22 @@ func verifyVolumeReportedAsAttachedToNode(

}

func verifyVolumeNoStatusUpdateNeeded(
t *testing.T,
volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName,
asw cache.ActualStateOfWorld,
) {
volumes := asw.GetVolumesToReportAttached()
for _, volume := range volumes[nodeName] {
if volume.Name == volumeName {
t.Fatalf("Check volume <%v> is reported as need to update status on node <%v>, expected false",
volumeName,
nodeName)
}
}
}

func verifyNewDetacherCallCount(
t *testing.T,
expectZeroNewDetacherCallCount bool,
Expand Down
6 changes: 6 additions & 0 deletions pkg/volume/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
// The node is marked as uncertain. The attach operation will fail and return timeout error
// for the first attach call. The following call will return sucesssfully.
UncertainAttachNode = "uncertain-attach-node"
// The detach operation will keep failing on the node.
FailDetachNode = "fail-detach-node"
// The node is marked as timeout. The attach operation will always fail and return timeout error
// but the operation is actually succeeded.
TimeoutAttachNode = "timeout-attach-node"
Expand Down Expand Up @@ -1083,6 +1085,10 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
return fmt.Errorf("trying to detach volume %q that is not attached to the node %q", volumeName, node)
}

if nodeName == FailDetachNode {
return fmt.Errorf("fail to detach volume %q to node %q", volumeName, nodeName)
}

volumeNodes.Delete(node)
if volumeNodes.Len() == 0 {
delete(fv.VolumesAttached, volumeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"fmt"
"sync"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -106,6 +106,13 @@ type NestedPendingOperations interface {
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName) bool

// IsOperationSafeToRetry returns false if an operation for the given volumeName
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName, operationName string) bool
}

// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
Expand Down Expand Up @@ -185,6 +192,33 @@ func (grm *nestedPendingOperations) Run(

return nil
}
func (grm *nestedPendingOperations) IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
operationName string) bool {

grm.lock.RLock()
defer grm.lock.RUnlock()

opKey := operationKey{volumeName, podName, nodeName}
exist, previousOpIndex := grm.isOperationExists(opKey)
if !exist {
return true
}
previousOp := grm.operations[previousOpIndex]
if previousOp.operationPending {
return false
}
backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
if backOffErr != nil {
if previousOp.operationName == operationName {
return false
}
}

return true
}

func (grm *nestedPendingOperations) IsOperationPending(
volumeName v1.UniqueVolumeName,
Expand Down
11 changes: 11 additions & 0 deletions pkg/volume/util/operationexecutor/operation_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type OperationExecutor interface {
// IsOperationPending returns true if an operation for the given volumeName
// and one of podName or nodeName is pending, otherwise it returns false
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
// IsOperationSafeToRetry returns false if an operation for the given volumeName
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
Expand Down Expand Up @@ -664,6 +667,14 @@ func (oe *operationExecutor) IsOperationPending(
return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
}

func (oe *operationExecutor) IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
operationName string) bool {
return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName)
}

func (oe *operationExecutor) AttachVolume(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
Expand Down
5 changes: 3 additions & 2 deletions pkg/volume/util/operationexecutor/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
const (
unknownVolumePlugin string = "UnknownVolumePlugin"
unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
DetachOperationName string = "volume_detach"
)

// InTreeToCSITranslator contains methods required to check migratable status
Expand Down Expand Up @@ -491,9 +492,9 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
}

return volumetypes.GeneratedOperations{
OperationName: "volume_detach",
OperationName: DetachOperationName,
OperationFunc: detachVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"),
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName),
EventRecorderFunc: nil, // nil because we do not want to generate event on error
}, nil
}
Expand Down

0 comments on commit bfdfba3

Please sign in to comment.