Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recheck pod volumes before marking pod as processed #44412

Merged
merged 2 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
return
}

allVolumesAdded := true

// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
Expand All @@ -270,6 +272,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
podVolume.Name,
format.Pod(pod),
err)
allVolumesAdded = false
continue
}

Expand All @@ -283,6 +286,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
volumeSpec.Name(),
uniquePodName,
err)
allVolumesAdded = false
}

glog.V(10).Infof(
Expand All @@ -292,7 +296,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
uniquePodName)
}

dswp.markPodProcessed(uniquePodName)
// some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
}

}

// podPreviouslyProcessed returns true if the volumes for this pod have already
Expand Down Expand Up @@ -327,6 +335,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(

// createVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed.
// Returns an error if unable to obtain the volume at this time.
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) {
if pvcSource :=
Expand Down Expand Up @@ -409,6 +418,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {

return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace,
Expand Down
52 changes: 52 additions & 0 deletions pkg/kubelet/volumemanager/volume_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,44 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
}
}

func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())

node, pod, pv, claim := createObjects()
claim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimPending,
}

kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)

manager := newTestVolumeManager(tmpDir, podManager, kubeClient)

stopCh := runVolumeManager(manager)
defer close(stopCh)

podManager.SetPods([]*v1.Pod{pod})

// Fake node status update
go simulateVolumeInUseUpdate(
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
stopCh,
manager)

// delayed claim binding
go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)

err = manager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
}

}

func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
Expand Down Expand Up @@ -279,6 +317,20 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str
}
}

func delayClaimBecomesBound(
kubeClient clientset.Interface,
namespace, claimName string,
) {
time.Sleep(500 * time.Millisecond)
volumeClaim, _ :=
kubeClient.Core().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{})
volumeClaim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
}
kubeClient.Core().PersistentVolumeClaims(namespace).Update(volumeClaim)
return
}

func runVolumeManager(manager VolumeManager) chan struct{} {
stopCh := make(chan struct{})
//readyCh := make(chan bool, 1)
Expand Down