Skip to content

Commit

Permalink
Add a flag to disable force detach behavior in kube-controller-manager
Browse files Browse the repository at this point in the history
This is an abbreviated version of kubernetes#120344

It changes the boolean plumbing to use a global package variable to
avoid conflicts.

The unit test is added in an isolated file, named starting with z_ so it
runs after other OSS unit tests which are not resilient to metrics
starting at any value other than 0.

Bug: b/272460654
Change-Id: I5437f3d2dc73f4f1a3782ba3afe5fdc8f93287e4
  • Loading branch information
rohitssingh authored and GCB Sync Bot committed Mar 12, 2024
1 parent 32e0926 commit 453209c
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 9 deletions.
Expand Up @@ -22,6 +22,8 @@ import (
"time"

attachdetachconfig "k8s.io/kubernetes/pkg/controller/volume/attachdetach/config"

reconciler "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
)

// AttachDetachControllerOptions holds the AttachDetachController options.
Expand All @@ -37,6 +39,8 @@ func (o *AttachDetachControllerOptions) AddFlags(fs *pflag.FlagSet) {

fs.BoolVar(&o.DisableAttachDetachReconcilerSync, "disable-attach-detach-reconcile-sync", false, "Disable volume attach detach reconciler sync. Disabling this may cause volumes to be mismatched with pods. Use wisely.")
fs.DurationVar(&o.ReconcilerSyncLoopPeriod.Duration, "attach-detach-reconcile-sync-period", o.ReconcilerSyncLoopPeriod.Duration, "The reconciler sync wait time between volume attach detach. This duration must be larger than one second, and increasing this value from the default may allow for volumes to be mismatched with pods.")

fs.BoolVar(&reconciler.DisableForceDetachOnTimeout, "disable-force-detach-on-timeout", false, "Prevent force detaching volumes based on maximum unmount time and node status. If this flag is set to true, the non-graceful node shutdown feature must be used to recover from node failure. See https://k8s.io/docs/storage-disable-force-detach-on-timeout/.")
}

// ApplyTo fills up AttachDetachController config with options.
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/gke_reconciler.go
@@ -0,0 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

var DisableForceDetachOnTimeout bool
23 changes: 14 additions & 9 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -207,24 +207,29 @@ func (rc *reconciler) reconcile(ctx context.Context) {
logger.Error(err, "Cannot trigger detach because it fails to set detach request time with error")
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether the umount drain timer expired
maxWaitForUnmountDurationExpired := elapsedTime > rc.maxWaitForUnmountDuration

isHealthy, err := rc.nodeIsHealthy(attachedVolume.NodeName)
if err != nil {
logger.Error(err, "Failed to get health of node", "node", klog.KRef("", string(attachedVolume.NodeName)))
}

// Force detach volumes from unhealthy nodes after maxWaitForUnmountDuration.
forceDetach := !isHealthy && timeout
// Force detach volumes from unhealthy nodes after maxWaitForUnmountDuration if force detach is enabled
// Ensure that the timeout condition checks this correctly so that the correct metric is updated below
forceDetatchTimeoutExpired := maxWaitForUnmountDurationExpired && !DisableForceDetachOnTimeout
if maxWaitForUnmountDurationExpired && DisableForceDetachOnTimeout {
logger.V(5).Info("Drain timeout expired for volume but disableForceDetachOnTimeout was set", "node", klog.KRef("", string(attachedVolume.NodeName)), "volumeName", attachedVolume.VolumeName)
}
forceDetach := !isHealthy && forceDetatchTimeoutExpired

hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName)
if err != nil {
logger.Error(err, "Failed to get taint specs for node", "node", klog.KRef("", string(attachedVolume.NodeName)))
}

// Check whether volume is still mounted. Skip detach if it is still mounted unless force detach timeout
// or the node has `node.kubernetes.io/out-of-service` taint.
// Check whether volume is still mounted. Skip detach if it is still mounted unless we have
// decided to force detach or the node has `node.kubernetes.io/out-of-service` taint.
if attachedVolume.MountedByNode && !forceDetach && !hasOutOfServiceTaint {
logger.V(5).Info("Cannot detach volume because it is still mounted", "node", klog.KRef("", string(attachedVolume.NodeName)), "volumeName", attachedVolume.VolumeName)
continue
Expand Down Expand Up @@ -254,19 +259,19 @@ func (rc *reconciler) reconcile(ctx context.Context) {
}

// Trigger detach volume which requires verifying safe to detach step
// If timeout is true, skip verifySafeToDetach check
// If forceDetatchTimeoutExpired is true, skip verifySafeToDetach check
// If the node has node.kubernetes.io/out-of-service taint with NoExecute effect, skip verifySafeToDetach check
logger.V(5).Info("Starting attacherDetacher.DetachVolume", "node", klog.KRef("", string(attachedVolume.NodeName)), "volumeName", attachedVolume.VolumeName)
if hasOutOfServiceTaint {
logger.V(4).Info("node has out-of-service taint", "node", klog.KRef("", string(attachedVolume.NodeName)))
}
verifySafeToDetach := !(timeout || hasOutOfServiceTaint)
verifySafeToDetach := !(forceDetatchTimeoutExpired || hasOutOfServiceTaint)
err = rc.attacherDetacher.DetachVolume(logger, attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if verifySafeToDetach { // normal detach
logger.Info("attacherDetacher.DetachVolume started", "node", klog.KRef("", string(attachedVolume.NodeName)), "volumeName", attachedVolume.VolumeName)
} else { // force detach
if timeout {
if forceDetatchTimeoutExpired {
metrics.RecordForcedDetachMetric(metrics.ForceDetachReasonTimeout)
logger.Info("attacherDetacher.DetachVolume started: this volume is not safe to detach, but maxWaitForUnmountDuration expired, force detaching",
"duration", rc.maxWaitForUnmountDuration,
Expand Down
187 changes: 187 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/z_gke_reconciler_test.go
@@ -0,0 +1,187 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"context"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
metricstestutil "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
)

// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// The node starts as healthy.
//
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies that the volume is NOT detached after maxWaitForUnmountDuration.
// Marks the node as unhealthy.
// Sets forceDetachOnUmountDisabled to true.
// Verifies that the volume is not detached after maxWaitForUnmountDuration.
//
// Then applies the node.kubernetes.io/out-of-service taint.
// Verifies that there is still just one attach call.
// Verifies there is now one detach call.
func Test_Run_OneVolumeDetachOnUnhealthyNodeWithForceDetachOnUnmountDisabled(t *testing.T) {
originalValue := DisableForceDetachOnTimeout
DisableForceDetachOnTimeout = true // change the option we're testing
t.Cleanup(func() {
DisableForceDetachOnTimeout = originalValue
})

registerMetrics.Do(func() {
legacyregistry.MustRegister(metrics.ForceDetachMetricCounter)
})
// NOTE: This value is being pulled from a global variable, so it won't necessarily be 0 at the start of the test
// For example, if Test_Run_OneVolumeDetachOnOutOfServiceTaintedNode runs before this test, then it will be 1
initialForceDetachCount, err := metricstestutil.GetCounterMetricValue(metrics.ForceDetachMetricCounter.WithLabelValues(metrics.ForceDetachReasonOutOfService))
if err != nil {
t.Errorf("Error getting initialForceDetachCount")
}

// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
nodeLister := informerFactory.Core().V1().Nodes().Lister()
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad,
nsu, nodeLister, fakeRecorder)
podName1 := "pod-uid1"
volumeName1 := v1.UniqueVolumeName("volume-name1")
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
nodeName1 := k8stypes.NodeName("worker-0")
node1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}
addErr := informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1)
if addErr != nil {
t.Fatalf("Add node failed. Expected: <no error> Actual: <%v>", addErr)
}
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
volumeExists := dsw.VolumeExists(volumeName1, nodeName1)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName1,
nodeName1)
}

generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1,
podName1), volumeSpec1, nodeName1)
if podErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
}

// Act
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go reconciler.Run(ctx)

// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)

// Act
// Delete the pod and the volume will be detached even after the maxWaitForUnmountDuration expires as volume is
// not unmounted and the node is healthy.
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
time.Sleep(maxWaitForUnmountDuration * 5)
// Assert
waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)

// Act
// Mark the node unhealthy
node2 := node1.DeepCopy()
node2.Status.Conditions[0].Status = v1.ConditionFalse
updateErr := informerFactory.Core().V1().Nodes().Informer().GetStore().Update(node2)
if updateErr != nil {
t.Fatalf("Update node failed. Expected: <no error> Actual: <%v>", updateErr)
}
// Assert -- Detach was not triggered after maxWaitForUnmountDuration
waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)

// Force detach metric due to out-of-service taint
// We shouldn't see any additional force detaches, so only consider the initial count
testForceDetachMetric(t, int(initialForceDetachCount), metrics.ForceDetachReasonOutOfService)

// Act
// Taint the node
node3 := node2.DeepCopy()
node3.Spec.Taints = append(node3.Spec.Taints, v1.Taint{Key: v1.TaintNodeOutOfService, Effect: v1.TaintEffectNoExecute})
updateErr = informerFactory.Core().V1().Nodes().Informer().GetStore().Update(node3)
if updateErr != nil {
t.Fatalf("Update node failed. Expected: <no error> Actual: <%v>", updateErr)
}
// Assert -- Detach was triggered after maxWaitForUnmountDuration
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)

// Force detach metric due to out-of-service taint
// We should see one more force detach, so consider the initial count + 1
testForceDetachMetric(t, int(initialForceDetachCount)+1, metrics.ForceDetachReasonOutOfService)
}

0 comments on commit 453209c

Please sign in to comment.