Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed race condition in pv binder #16432

Merged
merged 1 commit into from
Nov 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -158,44 +159,53 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
currentPhase := volume.Status.Phase
nextPhase := currentPhase

_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}

switch currentPhase {
// pending volumes are available only after indexing in order to be matched to claims.
case api.VolumePending:

// 3 possible states:
// 1. ClaimRef != nil and Claim exists: Prebound to claim. Make volume available for binding (it will match PVC).
// 2. ClaimRef != nil and Claim !exists: Recently recycled. Remove bind. Make volume available for new claim.
// 3. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding.
nextPhase = api.VolumeAvailable

if volume.Spec.ClaimRef != nil {
// Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending
// to start the volume again at the beginning of this lifecycle.
// ClaimRef is the last bind between persistent volume and claim.
// The claim has already been deleted by the user at this point
oldClaimRef := volume.Spec.ClaimRef
volume.Spec.ClaimRef = nil
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
// rollback on error, keep the ClaimRef until we can successfully update the volume
volume.Spec.ClaimRef = oldClaimRef
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
}
}
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if errors.IsNotFound(err) {
// Pending volumes that have a ClaimRef where the claim is missing were recently recycled.
// The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle.
// removing ClaimRef unbinds the volume
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = nil

_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(volume)
}
} else if err != nil {
return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err)
}
}
glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name)
nextPhase = api.VolumeAvailable
glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name)

// available volumes await a claim
case api.VolumeAvailable:
// TODO: remove api.VolumePending phase altogether
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
Expand Down Expand Up @@ -264,79 +274,71 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)

// claims can be in one of the following states:
//
// ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist.
// ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil
currentPhase := claim.Status.Phase
nextPhase := currentPhase

switch currentPhase {
// pending claims await a matching volume
switch claim.Status.Phase {
case api.ClaimPending:
volume, err := volumeIndex.FindBestMatchForClaim(claim)
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name)
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}

// make a binding reference to the claim.
// triggers update of the claim in this controller, which builds claim status
claim.Spec.VolumeName = volume.Name
// TODO: make this similar to Pod's binding both with BindingREST subresource and GuaranteedUpdate helper in etcd.go
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err == nil {
nextPhase = api.ClaimBound
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
} else {
// Rollback by unsetting the ClaimRef on the volume pointer.
// the volume in the index will be unbound again and ready to be matched.
claim.Spec.VolumeName = ""
// Rollback by restoring original phase to claim pointer
nextPhase = api.ClaimPending
return fmt.Errorf("Error updating volume: %+v\n", err)
// create a reference to the claim and assign it to the volume being bound.
// the volume is a pointer and assigning the reference fixes a race condition where another
// claim might match this volume but before the claimRef is persistent in the next case statement
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}

case api.ClaimBound:
volume, err := binderClient.GetPersistentVolume(claim.Spec.VolumeName)
// make a binding reference to the claim and ensure to update the local index to prevent dupe bindings
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting persistent volume: %v\n", err)
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(updatedVolume)
}

if volume.Spec.ClaimRef == nil {
glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n")
claimRef, err := api.GetReference(claim)
// the bind is persisted on the volume above and will always match the claim in a search.
// claim would remain Pending if the update fails, so processing this state is idempotent.
// this only needs to be processed once.
if claim.Spec.VolumeName != volume.Name {
claim.Spec.VolumeName = volume.Name
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
volume.Spec.ClaimRef = claimRef
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err)
}
}

// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim.
// Volumes cannot have zero AccessModes, so checking that a claim has access modes
// is sufficient to tell us if these values have already been set.
if len(claim.Status.AccessModes) == 0 {
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err := binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
}

if currentPhase != nextPhase {
claim.Status.Phase = nextPhase
binderClient.UpdatePersistentVolumeClaimStatus(claim)
case api.ClaimBound:
// no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server
// and we don't want to handle those changes at this time.

default:
return fmt.Errorf("Unknown state for PVC: %#v", claim)

}

glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
return nil
}

Expand Down