Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology aware volume scheduler and PV controller changes #55039

Merged
merged 14 commits into from
Nov 22, 2017
1 change: 1 addition & 0 deletions pkg/controller/volume/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ const (
ProvisioningFailed = "ProvisioningFailed"
ProvisioningCleanupFailed = "ProvisioningCleanupFailed"
ProvisioningSucceeded = "ProvisioningSucceeded"
WaitForFirstConsumer = "WaitForFirstConsumer"
)
8 changes: 8 additions & 0 deletions pkg/controller/volume/persistentvolume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ go_library(
"index.go",
"pv_controller.go",
"pv_controller_base.go",
"scheduler_assume_cache.go",
"scheduler_binder.go",
"scheduler_binder_cache.go",
"scheduler_binder_fake.go",
"volume_host.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
Expand Down Expand Up @@ -63,12 +67,16 @@ go_test(
"provision_test.go",
"pv_controller_test.go",
"recycle_test.go",
"scheduler_assume_cache_test.go",
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
library = ":go_default_library",
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/volume:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
Expand Down
30 changes: 29 additions & 1 deletion pkg/controller/volume/persistentvolume/binder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)

Expand Down Expand Up @@ -178,6 +179,25 @@ func TestSync(t *testing.T) {
[]string{"Normal FailedBinding"},
noerrors, testSyncClaim,
},
{
// syncClaim does not do anything when binding is delayed
"1-13 - delayed binding",
newVolumeArray("volume1-1", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classWait),
newVolumeArray("volume1-1", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classWait),
newClaimArray("claim1-1", "uid1-1", "1Gi", "", v1.ClaimPending, &classWait),
newClaimArray("claim1-1", "uid1-1", "1Gi", "", v1.ClaimPending, &classWait),
[]string{"Normal WaitForFirstConsumer"},
noerrors, testSyncClaim,
},
{
// syncClaim binds when binding is delayed but PV is prebound to PVC
"1-14 - successful prebound PV",
newVolumeArray("volume1-1", "1Gi", "", "claim1-1", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classWait),
newVolumeArray("volume1-1", "1Gi", "uid1-1", "claim1-1", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classWait),
newClaimArray("claim1-1", "uid1-1", "1Gi", "", v1.ClaimPending, &classWait),
newClaimArray("claim1-1", "uid1-1", "1Gi", "volume1-1", v1.ClaimBound, &classWait, annBoundByController, annBindCompleted),
noevents, noerrors, testSyncClaim,
},

// [Unit test set 2] User asked for a specific PV.
// Test the binding when pv.ClaimRef is already set by controller or
Expand Down Expand Up @@ -570,7 +590,15 @@ func TestSync(t *testing.T) {
},
}

runSyncTests(t, tests, []*storage.StorageClass{})
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")

runSyncTests(t, tests, []*storage.StorageClass{
{
ObjectMeta: metav1.ObjectMeta{Name: classWait},
VolumeBindingMode: &modeWait,
},
})
}

func TestSyncAlphaBlockVolume(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/volume/persistentvolume/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
Expand All @@ -220,6 +222,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
Expand Down Expand Up @@ -301,7 +305,12 @@ func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) err
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
Expand Down Expand Up @@ -331,6 +340,8 @@ func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim)
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}
Expand Down Expand Up @@ -822,6 +833,9 @@ var (
classUnknownInternal string = "unknown-internal"
classUnsupportedMountOptions string = "unsupported-mountoptions"
classLarge string = "large"
classWait string = "wait"

modeWait = storage.VolumeBindingWaitForFirstConsumer
)

// wrapTestWithPluginCalls returns a testCall that:
Expand Down
175 changes: 127 additions & 48 deletions pkg/controller/volume/persistentvolume/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)

// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes
Expand Down Expand Up @@ -74,7 +75,7 @@ func (pvIndex *persistentVolumeOrderedIndex) listByAccessModes(modes []v1.Persis
}

// find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVolumeClaim, delayBinding bool) (*v1.PersistentVolume, error) {
// PVs are indexed by their access modes to allow easier searching. Each
// index is the string representation of a set of access modes. There is a
// finite number of possible sets and PVs will only be indexed in one of
Expand All @@ -90,6 +91,45 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
// example above).
allPossibleModes := pvIndex.allPossibleMatchingAccessModes(claim.Spec.AccessModes)

for _, modes := range allPossibleModes {
volumes, err := pvIndex.listByAccessModes(modes)
if err != nil {
return nil, err
}

bestVol, err := findMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding)
if err != nil {
return nil, err
}

if bestVol != nil {
return bestVol, nil
}
}
return nil, nil
}

// findMatchingVolume goes through the list of volumes to find the best matching volume
// for the claim.
//
// This function is used by both the PV controller and scheduler.
//
// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned
// as a match for the claim, but unbound PVs are skipped.
//
// node is set only in the scheduler path. When set, the PV node affinity is checked against
// the node's labels.
//
// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple
// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen
// PV needs to be excluded from future matching.
func findMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {

var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
Expand All @@ -105,67 +145,90 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
selector = internalSelector
}

for _, modes := range allPossibleModes {
volumes, err := pvIndex.listByAccessModes(modes)
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}

volumeQty := volume.Spec.Capacity[v1.ResourceStorage]

// check if volumeModes do not match (Alpha and feature gate protected)
isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec)
if err != nil {
return nil, err
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMisMatch {
continue
}

// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
// check if volumeModes do not match (Alpha and feature gate protected)
isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec)
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMisMatch {
continue
}
}

if isVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
return volume, nil
}

// filter out:
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
if isVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
return volume, nil
}

if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}

// filter out:
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
continue
}

if node != nil {
// Scheduler path
// Check that the access modes match
if !checkAccessModes(claim, volume) {
continue
}
}

volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
}

if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}

return nil, nil
}

Expand All @@ -191,8 +254,8 @@ func checkVolumeModeMisMatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1
}

// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
return pvIndex.findByClaim(claim)
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *v1.PersistentVolumeClaim, delayBinding bool) (*v1.PersistentVolume, error) {
return pvIndex.findByClaim(claim, delayBinding)
}

// allPossibleMatchingAccessModes returns an array of AccessMode arrays that
Expand Down Expand Up @@ -274,3 +337,19 @@ func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
func claimrefToClaimKey(claimref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", claimref.Namespace, claimref.Name)
}

// Returns true if PV satisfies all the PVC's requested AccessModes
func checkAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool {
pvModesMap := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range volume.Spec.AccessModes {
pvModesMap[mode] = true
}

for _, mode := range claim.Spec.AccessModes {
_, ok := pvModesMap[mode]
if !ok {
return false
}
}
return true
}