Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

volume controller: Convert PersistentVolumes from Kubernetes 1.2 #25865

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 65 additions & 11 deletions pkg/controller/persistentvolume/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
return
}
for _, volume := range volumeList.Items {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
// Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume))
if !deleted {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
}

claimListObj, err := claimSource.List(api.ListOptions{})
Expand All @@ -164,6 +168,17 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
return
}

if ctrl.upgradeVolumeFrom1_2(pv) {
// volume deleted
return
}

// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume")
Expand All @@ -174,11 +189,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
return
}

pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
return
}
if err := ctrl.syncVolume(pv); err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
Expand All @@ -193,6 +203,17 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
// updateVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
return
}

if ctrl.upgradeVolumeFrom1_2(newVolume) {
// volume deleted
return
}

// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume")
Expand All @@ -203,11 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
return
}

newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
return
}
if err := ctrl.syncVolume(newVolume); err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
Expand Down Expand Up @@ -400,6 +416,44 @@ func (ctrl *PersistentVolumeController) Stop() {
close(ctrl.claimControllerStopCh)
}

const (
// these pair of constants are used by the provisioner in Kubernetes 1.2.
pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
)

// upgradeVolumeFrom1_2 updates PV from Kubernetes 1.2 to 1.3 and newer. In 1.2,
// we used template PersistentVolume instances for dynamic provisioning. In 1.3
// and later, these template (and not provisioned) instances must be removed to
// make the controller to provision a new PV.
// It returns true if the volume was deleted.
// TODO: remove this function when upgrade from 1.2 becomes unsupported.
func (ctrl *PersistentVolumeController) upgradeVolumeFrom1_2(volume *api.PersistentVolume) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth thinking about how future versions are going to deal with this.

annValue, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
if !found {
// The volume is not template
return false
}
if annValue == pvProvisioningCompletedAnnotationValue {
// The volume is already fully provisioned. The new controller will
// ignore this annotation and it will obey its ReclaimPolicy, which is
// likely to delete the volume when appropriate claim is deleted.
return false
}
glog.V(2).Infof("deleting unprovisioned template volume %q from Kubernetes 1.2.", volume.Name)
err := ctrl.kubeClient.Core().PersistentVolumes().Delete(volume.Name, nil)
if err != nil {
glog.Errorf("cannot delete unprovisioned template volume %q: %v", volume.Name, err)
}
// Remove from local cache
err = ctrl.volumes.store.Delete(volume)
if err != nil {
glog.Errorf("cannot remove volume %q from local cache: %v", volume.Name, err)
}

return true
}

// Stateless functions

func hasAnnotation(obj api.ObjectMeta, ann string) bool {
Expand Down
87 changes: 83 additions & 4 deletions pkg/controller/persistentvolume/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// can't reliably simulate periodic sync of volumes/claims - it would be
// either very timing-sensitive or slow to wait for real periodic sync.
func TestControllerSync(t *testing.T) {
expectedChanges := []int{4, 1, 1}
expectedChanges := []int{4, 1, 1, 2, 1, 1, 1}
tests := []controllerTest{
// [Unit test set 5] - controller tests.
// We test the controller as if
Expand Down Expand Up @@ -87,6 +87,74 @@ func TestControllerSync(t *testing.T) {
return nil
},
},
{
// addVolume with provisioned volume from Kubernetes 1.2. No "action"
// is expected - it should stay bound.
"5-5 - add bound volume from 1.2",
novolumes,
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
newClaimArray("claim5-5", "uid5-5", "1Gi", "", api.ClaimPending),
newClaimArray("claim5-5", "uid5-5", "1Gi", "volume5-5", api.ClaimBound, annBindCompleted, annBoundByController),
noevents, noerrors,
// Custom test function that generates a add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
volume := newVolume("volume5-5", "10Gi", "uid5-5", "claim5-5", api.VolumeBound, api.PersistentVolumeReclaimDelete)
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)
reactor.addVolumeEvent(volume)
return nil
},
},
{
// updateVolume with provisioned volume from Kubernetes 1.2. No
// "action" is expected - it should stay bound.
"5-6 - update bound volume from 1.2",
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
[]*api.PersistentVolume{addVolumeAnnotation(newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete), pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)},
newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound),
newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", api.ClaimBound, annBindCompleted),
noevents, noerrors,
// Custom test function that generates a add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
volume := newVolume("volume5-6", "10Gi", "uid5-6", "claim5-6", api.VolumeBound, api.PersistentVolumeReclaimDelete)
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue)
reactor.modifyVolumeEvent(volume)
return nil
},
},
{
// addVolume with unprovisioned volume from Kubernetes 1.2. The
// volume should be deleted.
"5-7 - add unprovisioned volume from 1.2",
novolumes,
novolumes,
newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending),
newClaimArray("claim5-7", "uid5-7", "1Gi", "", api.ClaimPending),
noevents, noerrors,
// Custom test function that generates a add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
volume := newVolume("volume5-7", "10Gi", "uid5-7", "claim5-7", api.VolumeBound, api.PersistentVolumeReclaimDelete)
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes")
reactor.addVolumeEvent(volume)
return nil
},
},
{
// updateVolume with unprovisioned volume from Kubernetes 1.2. The
// volume should be deleted.
"5-8 - update bound volume from 1.2",
novolumes,
novolumes,
newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending),
newClaimArray("claim5-8", "uid5-8", "1Gi", "", api.ClaimPending),
noevents, noerrors,
// Custom test function that generates a add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
volume := newVolume("volume5-8", "10Gi", "uid5-8", "claim5-8", api.VolumeBound, api.PersistentVolumeReclaimDelete)
volume = addVolumeAnnotation(volume, pvProvisioningRequiredAnnotationKey, "yes")
reactor.modifyVolumeEvent(volume)
return nil
},
},
}

for ix, test := range tests {
Expand All @@ -108,7 +176,7 @@ func TestControllerSync(t *testing.T) {
}

// Start the controller
defer ctrl.Stop()
count := reactor.getChangeCount()
go ctrl.Run()

// Wait for the controller to pass initial sync.
Expand All @@ -117,8 +185,6 @@ func TestControllerSync(t *testing.T) {
}
glog.V(4).Infof("controller synced, starting test")

count := reactor.getChangeCount()

// Call the tested function
err := test.test(ctrl, reactor, test)
if err != nil {
Expand All @@ -129,10 +195,15 @@ func TestControllerSync(t *testing.T) {
ctrl.claims.Resync()
ctrl.volumes.store.Resync()

// Wait at least once, just in case expectedChanges[ix] == 0
reactor.waitTest()
// Wait for expected number of operations.
for reactor.getChangeCount() < count+expectedChanges[ix] {
reactor.waitTest()
}

ctrl.Stop()

evaluateTestResults(ctrl, reactor, test, t)
}
}
Expand Down Expand Up @@ -204,3 +275,11 @@ func TestControllerCacheParsingError(t *testing.T) {
t.Errorf("Expected parsing error, got nil instead")
}
}

func addVolumeAnnotation(volume *api.PersistentVolume, annName, annValue string) *api.PersistentVolume {
if volume.Annotations == nil {
volume.Annotations = make(map[string]string)
}
volume.Annotations[annName] = annValue
return volume
}
16 changes: 16 additions & 0 deletions pkg/controller/persistentvolume/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
_, found := r.volumes[name]
if found {
delete(r.volumes, name)
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
Expand All @@ -264,6 +265,7 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
_, found := r.volumes[name]
if found {
delete(r.claims, name)
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
Expand Down Expand Up @@ -519,6 +521,20 @@ func (r *volumeReactor) addVolumeEvent(volume *api.PersistentVolume) {
r.volumeSource.Add(volume)
}

// modifyVolumeEvent simulates that a volume has been modified in etcd and the
// controller receives 'volume modified' event.
func (r *volumeReactor) modifyVolumeEvent(volume *api.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()

r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
clone, _ := conversion.NewCloner().DeepCopy(volume)
volumeClone := clone.(*api.PersistentVolume)
r.volumeSource.Modify(volumeClone)
}

// addClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim added' event.
func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {
Expand Down