Skip to content

Commit

Permalink
detach the volume when pod is terminated
Browse files Browse the repository at this point in the history
When pods are terminated we should detach the volume. Add a
flag to disable this behaviour if needed.
  • Loading branch information
gnufied committed May 4, 2017
1 parent df8551a commit 31c874a
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 31 deletions.
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
s.DisableAttachDetachReconcilerSync,
s.ReconcilerSyncLoopPeriod.Duration,
s.KeepTerminatedPodVolumes,
)
if attachDetachControllerErr != nil {
return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewCMServer() *CMServer {
ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem",
ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key",
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 60 * time.Second},
KeepTerminatedPodVolumes: false,
EnableTaintManager: true,
HorizontalPodAutoscalerUseRESTClients: false,
},
Expand Down Expand Up @@ -211,6 +212,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled
fs.Int32Var(&s.LargeClusterSizeThreshold, "large-cluster-size-threshold", 50, "Number of nodes from which NodeController treats the cluster as large for the eviction logic purposes. --secondary-node-eviction-rate is implicitly overridden to 0 for clusters this size or smaller.")
fs.Float32Var(&s.UnhealthyZoneThreshold, "unhealthy-zone-threshold", 0.55, "Fraction of Nodes in a zone which needs to be not Ready (minimum 3) for zone to be treated as unhealthy. ")
fs.BoolVar(&s.DisableAttachDetachReconcilerSync, "disable-attach-detach-reconcile-sync", false, "Disable volume attach detach reconciler sync. Disabling this may cause volumes to be mismatched with pods. Use wisely.")
fs.BoolVar(&s.KeepTerminatedPodVolumes, "keep-terminated-pod-volumes", false, "Keep terminated pod volumes attached to the node after the pod terminates. Can be useful for debugging volume related issues.")
fs.DurationVar(&s.ReconcilerSyncLoopPeriod.Duration, "attach-detach-reconcile-sync-period", s.ReconcilerSyncLoopPeriod.Duration, "The reconciler sync wait time between volume attach detach. This duration must be larger than one second, and increasing this value from the default may allow for volumes to be mismatched with pods.")
fs.BoolVar(&s.EnableTaintManager, "enable-taint-manager", s.EnableTaintManager, "WARNING: Beta feature. If set to true enables NoExecute Taints and will evict all not-tolerating Pod running on Nodes tainted with this kind of Taints.")
fs.BoolVar(&s.HorizontalPodAutoscalerUseRESTClients, "horizontal-pod-autoscaler-use-rest-clients", s.HorizontalPodAutoscalerUseRESTClients, "WARNING: alpha feature. If set to true, causes the horizontal pod autoscaler controller to use REST clients through the kube-aggregator, instead of using the legacy metrics client through the API server proxy. This is required for custom metrics support in the horizonal pod autoscaler.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubelet/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (c *kubeletConfiguration) addFlags(fs *pflag.FlagSet) {
fs.BoolVar(&c.ExperimentalKernelMemcgNotification, "experimental-kernel-memcg-notification", c.ExperimentalKernelMemcgNotification, "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.")
fs.Int32Var(&c.PodsPerCore, "pods-per-core", c.PodsPerCore, "Number of Pods per core that can run on this Kubelet. The total number of Pods on this Kubelet cannot exceed max-pods, so max-pods will be used if this calculation results in a larger number of Pods allowed on the Kubelet. A value of 0 disables this limit.")
fs.BoolVar(&c.ProtectKernelDefaults, "protect-kernel-defaults", c.ProtectKernelDefaults, "Default kubelet behaviour for kernel tuning. If set, kubelet errors if any of kernel tunables is different than kubelet defaults.")
fs.BoolVar(&c.KeepTerminatedPodVolumes, "keep-terminated-pod-volumes", c.KeepTerminatedPodVolumes, "Keep terminated pod volumes mounted to the node after the pod terminates. Can be useful for debugging volume related issues.")
fs.BoolVar(&c.KeepTerminatedPodVolumes, "keep-terminated-pod-volumes", c.KeepTerminatedPodVolumes, "Keep terminated pod volumes mounted to the node after the pod terminates. Can be useful for debugging volume related issues. If setting this value to true - make sure keep-terminated-pod-volumes options in controller is also set to true to avoid detaching mounted volumes.")

// CRI flags.
// TODO: Remove experimental-cri in kubernetes 1.7.
Expand Down
4 changes: 3 additions & 1 deletion hack/local-up-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ CPU_CFS_QUOTA=${CPU_CFS_QUOTA:-true}
ENABLE_HOSTPATH_PROVISIONER=${ENABLE_HOSTPATH_PROVISIONER:-"false"}
CLAIM_BINDER_SYNC_PERIOD=${CLAIM_BINDER_SYNC_PERIOD:-"15s"} # current k8s default
ENABLE_CONTROLLER_ATTACH_DETACH=${ENABLE_CONTROLLER_ATTACH_DETACH:-"true"} # current default
KEEP_TERMINATED_POD_VOLUMES=${KEEP_TERMINATED_POD_VOLUMES:-"true"}
# This is the default dir and filename where the apiserver will generate a self-signed cert
# which should be able to be used as the CA to verify itself
CERT_DIR=${CERT_DIR:-"/var/run/kubernetes"}
Expand Down Expand Up @@ -565,6 +566,7 @@ function start_controller_manager {
--cloud-config="${CLOUD_CONFIG}" \
--kubeconfig "$CERT_DIR"/controller.kubeconfig \
--use-service-account-credentials \
--keep-terminated-pod-volumes=${KEEP_TERMINATED_POD_VOLUMES} \
--master="https://${API_HOST}:${API_SECURE_PORT}" >"${CTLRMGR_LOG}" 2>&1 &
CTLRMGR_PID=$!
}
Expand Down Expand Up @@ -639,7 +641,7 @@ function start_kubelet {
--enable-controller-attach-detach="${ENABLE_CONTROLLER_ATTACH_DETACH}" \
--cgroups-per-qos=${CGROUPS_PER_QOS} \
--cgroup-driver=${CGROUP_DRIVER} \
--keep-terminated-pod-volumes=true \
--keep-terminated-pod-volumes=${KEEP_TERMINATED_POD_VOLUMES} \
--eviction-hard=${EVICTION_HARD} \
--eviction-soft=${EVICTION_SOFT} \
--eviction-pressure-transition-period=${EVICTION_PRESSURE_TRANSITION_PERIOD} \
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/componentconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,10 @@ type KubeControllerManagerConfiguration struct {
// through the kube-aggregator when enabled, instead of using the legacy metrics client
// through the API server proxy.
HorizontalPodAutoscalerUseRESTClients bool

// KeepTerminatedPodVolumes causes terminated pod volumes attached to the node after the pod terminates.
// Can be useful for debugging volume related issues.
KeepTerminatedPodVolumes bool
}

// VolumeConfiguration contains *all* enumerated flags meant to configure all volume
Expand Down
38 changes: 32 additions & 6 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func NewAttachDetachController(
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
disableReconciliationSync bool,
reconcilerSyncDuration time.Duration) (AttachDetachController, error) {
reconcilerSyncDuration time.Duration,
keepTerminatedPodVolumes bool) (AttachDetachController, error) {
// TODO: The default resyncPeriod for shared informers is 12 hours, this is
// unacceptable for the attach/detach controller. For example, if a pod is
// skipped because the node it is scheduled to didn't set its annotation in
Expand All @@ -113,6 +114,7 @@ func NewAttachDetachController(
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
}

if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
Expand Down Expand Up @@ -153,7 +155,8 @@ func NewAttachDetachController(
adc.desiredStateOfWorld,
&adc.volumePluginMgr,
pvcInformer.Lister(),
pvInformer.Lister())
pvInformer.Lister(),
keepTerminatedPodVolumes)

podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.podAdd,
Expand Down Expand Up @@ -233,6 +236,9 @@ type attachDetachController struct {

// recorder is used to record events in the API server
recorder record.EventRecorder

// whether to keep pod volumes attached for terminated pods
keepTerminatedPodVolumes bool
}

func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -385,8 +391,13 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
return
}

util.ProcessPodVolumes(pod, true, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
if volumehelper.IsPodTerminated(pod, pod.Status) && !adc.keepTerminatedPodVolumes {
util.ProcessPodVolumes(pod, false, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
} else {
util.ProcessPodVolumes(pod, true, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}
}

// GetDesiredStateOfWorld returns desired state of world associated with controller
Expand All @@ -395,8 +406,23 @@ func (adc *attachDetachController) GetDesiredStateOfWorld() cache.DesiredStateOf
}

func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
// The flow for update is the same as add.
adc.podAdd(newObj)
pod, ok := newObj.(*v1.Pod)
if pod == nil || !ok {
return
}
if pod.Spec.NodeName == "" {
// Ignore pods without NodeName, indicating they are not scheduled.
return
}

addPodFlag := true

if volumehelper.IsPodTerminated(pod, pod.Status) && !adc.keepTerminatedPodVolumes {
addPodFlag = false
}

util.ProcessPodVolumes(pod, addPodFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}

func (adc *attachDetachController) podDelete(obj interface{}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
nil, /* cloud */
nil, /* plugins */
false,
time.Second*5)
time.Second*5,
false /*keepTerminatedPodVolumes */)

// Assert
if err != nil {
Expand Down Expand Up @@ -74,6 +75,7 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: nil,
keepTerminatedPodVolumes: false,
}

// Act
Expand Down Expand Up @@ -212,7 +214,8 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
nil, /* cloud */
plugins,
false,
time.Second*1)
time.Second*1,
false /*keepTerminatedPodVolumes */)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,30 @@ func NewDesiredStateOfWorldPopulator(
desiredStateOfWorld cache.DesiredStateOfWorld,
volumePluginMgr *volume.VolumePluginMgr,
pvcLister corelisters.PersistentVolumeClaimLister,
pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator {
pvLister corelisters.PersistentVolumeLister,
keepTerminatedPodVolumes bool) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
loopSleepDuration: loopSleepDuration,
listPodsRetryDuration: listPodsRetryDuration,
podLister: podLister,
desiredStateOfWorld: desiredStateOfWorld,
volumePluginMgr: volumePluginMgr,
pvcLister: pvcLister,
pvLister: pvLister,
loopSleepDuration: loopSleepDuration,
listPodsRetryDuration: listPodsRetryDuration,
podLister: podLister,
desiredStateOfWorld: desiredStateOfWorld,
volumePluginMgr: volumePluginMgr,
pvcLister: pvcLister,
pvLister: pvLister,
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
}
}

type desiredStateOfWorldPopulator struct {
loopSleepDuration time.Duration
podLister corelisters.PodLister
desiredStateOfWorld cache.DesiredStateOfWorld
volumePluginMgr *volume.VolumePluginMgr
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
listPodsRetryDuration time.Duration
timeOfLastListPods time.Time
loopSleepDuration time.Duration
podLister corelisters.PodLister
desiredStateOfWorld cache.DesiredStateOfWorld
volumePluginMgr *volume.VolumePluginMgr
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
listPodsRetryDuration time.Duration
timeOfLastListPods time.Time
keepTerminatedPodVolumes bool
}

func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -127,11 +130,13 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
glog.Errorf("podLister Get failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err)
continue
default:
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
continue
if dswp.keepTerminatedPodVolumes || !volumehelper.IsPodTerminated(informerPod, informerPod.Status) {
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
continue
}
}
}

Expand Down
72 changes: 72 additions & 0 deletions test/integration/volume/attach_detach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,77 @@ func TestPodDeletionWithDswp(t *testing.T) {
close(stopCh)
}

func TestPodUpdateWithWithADC(t *testing.T) {
_, server := framework.RunAMaster(nil)
defer server.Close()
namespaceName := "test-pod-update"

node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-sandbox",
Annotations: map[string]string{
volumehelper.ControllerManagedAttachAnnotation: "true",
},
},
}

ns := framework.CreateTestingNamespace(namespaceName, server, t)
defer framework.DeleteTestingNamespace(ns, server, t)

testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)

pod := fakePodWithVol(namespaceName)
podStopCh := make(chan struct{})

if _, err := testClient.Core().Nodes().Create(node); err != nil {
t.Fatalf("Failed to created node : %v", err)
}

go informers.Core().V1().Nodes().Informer().Run(podStopCh)

if _, err := testClient.Core().Pods(ns.Name).Create(pod); err != nil {
t.Errorf("Failed to create pod : %v", err)
}

podInformer := informers.Core().V1().Pods().Informer()
go podInformer.Run(podStopCh)

// start controller loop
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
go ctrl.Run(stopCh)

waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
}

_, _, err = podInformer.GetStore().GetByKey(podKey)

if err != nil {
t.Fatalf("Pod not found in Pod Informer cache : %v", err)
}

waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())

pod.Status.Phase = v1.PodSucceeded

if _, err := testClient.Core().Pods(ns.Name).UpdateStatus(pod); err != nil {
t.Errorf("Failed to update pod : %v", err)
}

time.Sleep(20 * time.Second)
podsToAdd := ctrl.GetDesiredStateOfWorld().GetPodToAdd()
if len(podsToAdd) != 0 {
t.Fatalf("All pods should have been removed")
}

close(podStopCh)
close(stopCh)
}

// wait for the podInformer to observe the pods. Call this function before
// running the RC manager to prevent the rc manager from creating new pods
// rather than adopting the existing ones.
Expand Down Expand Up @@ -214,6 +285,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
plugins,
false,
time.Second*5,
false, /* keepTerminatedPodVolumes */
)
if err != nil {
t.Fatalf("Error creating AttachDetach : %v", err)
Expand Down

0 comments on commit 31c874a

Please sign in to comment.