Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provision vSphere volume as per selectedNode #79931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/volume/vsphere_volume/vsphere_volume.go
Expand Up @@ -171,7 +171,7 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str
// Abstract interface to disk operations.
type vdManager interface {
// Creates a volume
CreateVolume(provisioner *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error)
CreateVolume(provisioner *vsphereVolumeProvisioner, selectedNode *v1.Node, selectedZone []string) (volSpec *VolumeSpec, err error)
// Deletes a volume
DeleteVolume(deleter *vsphereVolumeDeleter) error
}
Expand Down Expand Up @@ -368,14 +368,14 @@ func (v *vsphereVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopol
if !util.AccessModesContainedInAll(v.plugin.GetAccessModes(), v.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", v.options.PVC.Spec.AccessModes, v.plugin.GetAccessModes())
}
klog.V(1).Infof("Provision with allowedTopologies : %s", allowedTopologies)
klog.V(1).Infof("Provision with selectedNode: %s and allowedTopologies : %s", getNodeName(selectedNode), allowedTopologies)
selectedZones, err := volumehelpers.ZonesFromAllowedTopologies(allowedTopologies)
if err != nil {
return nil, err
}

klog.V(4).Infof("Selected zones for volume : %s", selectedZones)
volSpec, err := v.manager.CreateVolume(v, selectedZones.List())
volSpec, err := v.manager.CreateVolume(v, selectedNode, selectedZones.List())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,3 +465,10 @@ func getVolumeSource(

return nil, false, fmt.Errorf("Spec does not reference a VSphere volume type")
}

func getNodeName(node *v1.Node) string {
if node == nil {
return ""
}
return node.Name
}
2 changes: 1 addition & 1 deletion pkg/volume/vsphere_volume/vsphere_volume_test.go
Expand Up @@ -62,7 +62,7 @@ func TestCanSupport(t *testing.T) {
type fakePDManager struct {
}

func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error) {
func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner, selectedNode *v1.Node, selectedZone []string) (volSpec *VolumeSpec, err error) {
volSpec = &VolumeSpec{
Path: "[local] test-volume-name.vmdk",
Size: 100,
Expand Down
3 changes: 2 additions & 1 deletion pkg/volume/vsphere_volume/vsphere_volume_util.go
Expand Up @@ -86,7 +86,7 @@ func verifyDevicePath(path string) (string, error) {
}

// CreateVolume creates a vSphere volume.
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error) {
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner, selectedNode *v1.Node, selectedZone []string) (volSpec *VolumeSpec, err error) {
var fstype string
cloud, err := getCloudProvider(v.plugin.host.GetCloudProvider())
if err != nil {
Expand All @@ -108,6 +108,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner, selectedZ
}

volumeOptions.Zone = selectedZone
volumeOptions.SelectedNode = selectedNode
// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
for parameter, value := range v.options.Parameters {
Expand Down
11 changes: 8 additions & 3 deletions staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go
Expand Up @@ -464,8 +464,8 @@ func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain str
return nil, err
}
klog.V(4).Infof("Node Details: %v", nodeDetails)
// Return those hosts that are in the given zone.
hosts := make([]*object.HostSystem, 0)
// Build a map of Host moRef to HostSystem
hostMap := make(map[string]*object.HostSystem)
for _, n := range nodeDetails {
// Match the provided zone failure domain with the node.
klog.V(9).Infof("Matching provided zone %s with node %s zone %s", zoneFailureDomain, n.NodeName, n.Zone.FailureDomain)
Expand All @@ -475,9 +475,14 @@ func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain str
klog.Errorf("Failed to get host system for VM %s. err: %+v", n.vm, err)
continue
}
hosts = append(hosts, host)
hostMap[host.Reference().Value] = host
}
}
// Build the unique list of hosts.
hosts := make([]*object.HostSystem, 0)
for _, value := range hostMap {
hosts = append(hosts, value)
}
klog.V(4).Infof("GetHostsInZone %v returning: %v", zoneFailureDomain, hosts)
return hosts, nil
}
Expand Up @@ -25,6 +25,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/legacy-cloud-providers/vsphere/vclib",
importpath = "k8s.io/legacy-cloud-providers/vsphere/vclib",
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/pkg/version:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/vmware/govmomi/find:go_default_library",
Expand Down
Expand Up @@ -19,6 +19,7 @@ package vclib
import (
"strings"

"k8s.io/api/core/v1"
"k8s.io/klog"
)

Expand All @@ -34,6 +35,7 @@ type VolumeOptions struct {
StoragePolicyID string
SCSIControllerType string
Zone []string
SelectedNode *v1.Node
}

var (
Expand Down
69 changes: 44 additions & 25 deletions staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go
Expand Up @@ -1193,12 +1193,37 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
}

var vmOptions *vclib.VMOptions
var zonesToSearch []string

if volumeOptions.SelectedNode != nil {
if len(volumeOptions.Zone) > 1 {
// In waitForFirstConsumer mode, if more than one allowedTopologies is specified, the volume should satisfy all these.
zonesToSearch = volumeOptions.Zone
} else {
// Pick the selectedNode's zone, if available.
nodeInfo, err := vs.nodeManager.GetNodeInfoWithNodeObject(volumeOptions.SelectedNode)
if err != nil {
klog.Errorf("Unable to get node information for %s. err: %+v", volumeOptions.SelectedNode.Name, err)
return "", err
}
klog.V(4).Infof("selectedNode info : %s", nodeInfo)
if nodeInfo.zone != nil && nodeInfo.zone.FailureDomain != "" {
zonesToSearch = append(zonesToSearch, nodeInfo.zone.FailureDomain)
}
}
} else {
// If no selectedNode, pick allowedTopologies, if provided.
zonesToSearch = volumeOptions.Zone
}
klog.V(1).Infof("Volume topology : %s", zonesToSearch)

if volumeOptions.VSANStorageProfileData != "" || volumeOptions.StoragePolicyName != "" {
// If datastore and zone are specified, first validate if the datastore is in the provided zone.
if len(volumeOptions.Zone) != 0 && volumeOptions.Datastore != "" {
klog.V(4).Infof("Specified zone : %s, datastore : %s", volumeOptions.Zone, volumeOptions.Datastore)
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, volumeOptions.Zone)
if len(zonesToSearch) != 0 && volumeOptions.Datastore != "" {
klog.V(4).Infof("Specified zone : %s, datastore : %s", zonesToSearch, volumeOptions.Datastore)
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)
if err != nil {
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
return "", err
}

Expand All @@ -1210,7 +1235,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
}
}
if !found {
err := fmt.Errorf("The specified datastore %s does not match the provided zones : %s", volumeOptions.Datastore, volumeOptions.Zone)
err := fmt.Errorf("The specified datastore %s does not match the provided zones : %s", volumeOptions.Datastore, zonesToSearch)
klog.Error(err)
return "", err
}
Expand All @@ -1229,25 +1254,19 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
cleanUpRoutineInitLock.Unlock()
}
if volumeOptions.StoragePolicyName != "" && volumeOptions.Datastore == "" {
if len(volumeOptions.Zone) == 0 {
if len(zonesToSearch) == 0 {
klog.V(4).Infof("Selecting a shared datastore as per the storage policy %s", volumeOptions.StoragePolicyName)
datastoreInfo, err = getPbmCompatibleDatastore(ctx, vsi.conn.Client, volumeOptions.StoragePolicyName, vs.nodeManager)
} else {
// If zone is specified, first get the datastores in the zone.
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, volumeOptions.Zone)
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)

if err != nil {
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", volumeOptions.Zone, err)
return "", err
}
// If unable to get any datastore, fail the operation.
if len(dsList) == 0 {
err := fmt.Errorf("Failed to find a shared datastore matching zone %s", volumeOptions.Zone)
klog.Error(err)
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
return "", err
}

klog.V(4).Infof("Specified zone : %s. Picking a datastore as per the storage policy %s among the zoned datastores : %s", volumeOptions.Zone,
klog.V(4).Infof("Specified zone : %s. Picking a datastore as per the storage policy %s among the zoned datastores : %s", zonesToSearch,
volumeOptions.StoragePolicyName, dsList)
// Among the compatible datastores, select the one based on the maximum free space.
datastoreInfo, err = getPbmCompatibleZonedDatastore(ctx, vsi.conn.Client, volumeOptions.StoragePolicyName, dsList)
Expand All @@ -1259,17 +1278,17 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
klog.V(1).Infof("Datastore selected as per policy : %s", datastoreInfo.Info.Name)
} else {
// If zone is specified, pick the datastore in the zone with maximum free space within the zone.
if volumeOptions.Datastore == "" && len(volumeOptions.Zone) != 0 {
klog.V(4).Infof("Specified zone : %s", volumeOptions.Zone)
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, volumeOptions.Zone)
if volumeOptions.Datastore == "" && len(zonesToSearch) != 0 {
klog.V(4).Infof("Specified zone : %s", zonesToSearch)
dsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)

if err != nil {
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", volumeOptions.Zone, err)
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
return "", err
}
// If unable to get any datastore, fail the operation
if len(dsList) == 0 {
err := fmt.Errorf("Failed to find a shared datastore matching zone %s", volumeOptions.Zone)
err := fmt.Errorf("Failed to find a shared datastore matching zone %s", zonesToSearch)
klog.Error(err)
return "", err
}
Expand All @@ -1279,11 +1298,11 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
klog.Errorf("Failed to get shared datastore: %+v", err)
return "", err
}
klog.V(1).Infof("Specified zone : %s. Selected datastore : %s", volumeOptions.Zone, datastoreInfo.Info.Name)
klog.V(1).Infof("Specified zone : %s. Selected datastore : %s", zonesToSearch, datastoreInfo.Info.Name)
} else {
var sharedDsList []*vclib.DatastoreInfo
var err error
if len(volumeOptions.Zone) == 0 {
if len(zonesToSearch) == 0 {
// If zone is not provided, get the shared datastore across all node VMs.
klog.V(4).Infof("Validating if datastore %s is shared across all node VMs", datastoreName)
sharedDsList, err = getSharedDatastoresInK8SCluster(ctx, vs.nodeManager)
Expand All @@ -1295,14 +1314,14 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
err = fmt.Errorf("The specified datastore %s is not a shared datastore across node VMs", datastoreName)
} else {
// If zone is provided, get the shared datastores in that zone.
klog.V(4).Infof("Validating if datastore %s is in zone %s ", datastoreName, volumeOptions.Zone)
sharedDsList, err = getDatastoresForZone(ctx, vs.nodeManager, volumeOptions.Zone)
klog.V(4).Infof("Validating if datastore %s is in zone %s ", datastoreName, zonesToSearch)
sharedDsList, err = getDatastoresForZone(ctx, vs.nodeManager, zonesToSearch)
if err != nil {
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", volumeOptions.Zone, err)
klog.Errorf("Failed to find a shared datastore matching zone %s. err: %+v", zonesToSearch, err)
return "", err
}
// Prepare error msg to be used later, if required.
err = fmt.Errorf("The specified datastore %s does not match the provided zones : %s", datastoreName, volumeOptions.Zone)
err = fmt.Errorf("The specified datastore %s does not match the provided zones : %s", datastoreName, zonesToSearch)
}
found := false
// Check if the selected datastore belongs to the list of shared datastores computed.
Expand Down
Expand Up @@ -306,6 +306,10 @@ func getDatastoresForZone(ctx context.Context, nodeManager *NodeManager, selecte
sharedDatastoresPerZone = dsObjList
} else {
sharedDatastoresPerZone = intersect(sharedDatastoresPerZone, dsObjList)
if len(sharedDatastoresPerZone) == 0 {
klog.V(4).Infof("No shared datastores found among hosts %s", hosts)
return nil, fmt.Errorf("No matching datastores found in the kubernetes cluster for zone %s", zone)
}
}
klog.V(9).Infof("Shared datastore list after processing host %s : %s", host, sharedDatastoresPerZone)
}
Expand All @@ -314,6 +318,9 @@ func getDatastoresForZone(ctx context.Context, nodeManager *NodeManager, selecte
sharedDatastores = sharedDatastoresPerZone
} else {
sharedDatastores = intersect(sharedDatastores, sharedDatastoresPerZone)
if len(sharedDatastores) == 0 {
return nil, fmt.Errorf("No matching datastores found in the kubernetes cluster across zones %s", selectedZones)
}
}
}
klog.V(1).Infof("Returning selected datastores : %s", sharedDatastores)
Expand Down
1 change: 1 addition & 0 deletions test/e2e/storage/vsphere/BUILD
Expand Up @@ -39,6 +39,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/e2e/storage/vsphere",
deps = [
"//pkg/controller/volume/events:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/storage/vsphere/vsphere_scale.go
Expand Up @@ -129,7 +129,7 @@ var _ = utils.SIGDescribe("vcp at scale [Feature:vsphere] ", func() {
case storageclass4:
scParams[Datastore] = datastoreName
}
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(scname, scParams, nil))
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(scname, scParams, nil, ""))
gomega.Expect(sc).NotTo(gomega.BeNil(), "Storage class is empty")
framework.ExpectNoError(err, "Failed to create storage class")
defer client.StorageV1().StorageClasses().Delete(scname, nil)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/storage/vsphere/vsphere_statefulsets.go
Expand Up @@ -74,7 +74,7 @@ var _ = utils.SIGDescribe("vsphere statefulset", func() {
ginkgo.By("Creating StorageClass for Statefulset")
scParameters := make(map[string]string)
scParameters["diskformat"] = "thin"
scSpec := getVSphereStorageClassSpec(storageclassname, scParameters, nil)
scSpec := getVSphereStorageClassSpec(storageclassname, scParameters, nil, "")
sc, err := client.StorageV1().StorageClasses().Create(scSpec)
framework.ExpectNoError(err)
defer client.StorageV1().StorageClasses().Delete(sc.Name, nil)
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/storage/vsphere/vsphere_stress.go
Expand Up @@ -85,22 +85,22 @@ var _ = utils.SIGDescribe("vsphere cloud provider stress [Feature:vsphere]", fun
var err error
switch scname {
case storageclass1:
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass1, nil, nil))
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass1, nil, nil, ""))
case storageclass2:
var scVSanParameters map[string]string
scVSanParameters = make(map[string]string)
scVSanParameters[Policy_HostFailuresToTolerate] = "1"
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass2, scVSanParameters, nil))
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass2, scVSanParameters, nil, ""))
case storageclass3:
var scSPBMPolicyParameters map[string]string
scSPBMPolicyParameters = make(map[string]string)
scSPBMPolicyParameters[SpbmStoragePolicy] = policyName
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass3, scSPBMPolicyParameters, nil))
sc, err = client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(storageclass3, scSPBMPolicyParameters, nil, ""))
case storageclass4:
var scWithDSParameters map[string]string
scWithDSParameters = make(map[string]string)
scWithDSParameters[Datastore] = datastoreName
scWithDatastoreSpec := getVSphereStorageClassSpec(storageclass4, scWithDSParameters, nil)
scWithDatastoreSpec := getVSphereStorageClassSpec(storageclass4, scWithDSParameters, nil, "")
sc, err = client.StorageV1().StorageClasses().Create(scWithDatastoreSpec)
}
gomega.Expect(sc).NotTo(gomega.BeNil())
Expand Down
6 changes: 5 additions & 1 deletion test/e2e/storage/vsphere/vsphere_utils.go
Expand Up @@ -212,7 +212,7 @@ func verifyContentOfVSpherePV(client clientset.Interface, pvc *v1.PersistentVolu
e2elog.Logf("Successfully verified content of the volume")
}

func getVSphereStorageClassSpec(name string, scParameters map[string]string, zones []string) *storagev1.StorageClass {
func getVSphereStorageClassSpec(name string, scParameters map[string]string, zones []string, volumeBindingMode storagev1.VolumeBindingMode) *storagev1.StorageClass {
var sc *storagev1.StorageClass

sc = &storagev1.StorageClass{
Expand All @@ -238,6 +238,10 @@ func getVSphereStorageClassSpec(name string, scParameters map[string]string, zon
}
sc.AllowedTopologies = append(sc.AllowedTopologies, term)
}
if volumeBindingMode != "" {
mode := storagev1.VolumeBindingMode(string(volumeBindingMode))
sc.VolumeBindingMode = &mode
}
return sc
}

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/storage/vsphere/vsphere_volume_datastore.go
Expand Up @@ -70,7 +70,7 @@ var _ = utils.SIGDescribe("Volume Provisioning on Datastore [Feature:vsphere]",
scParameters[DiskFormat] = ThinDisk
err := invokeInvalidDatastoreTestNeg(client, namespace, scParameters)
framework.ExpectError(err)
errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": Datastore ` + InvalidDatastore + ` not found`
errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": Datastore '` + InvalidDatastore + `' not found`
if !strings.Contains(err.Error(), errorMsg) {
framework.ExpectNoError(err, errorMsg)
}
Expand All @@ -79,7 +79,7 @@ var _ = utils.SIGDescribe("Volume Provisioning on Datastore [Feature:vsphere]",

func invokeInvalidDatastoreTestNeg(client clientset.Interface, namespace string, scParameters map[string]string) error {
ginkgo.By("Creating Storage Class With Invalid Datastore")
storageclass, err := client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(DatastoreSCName, scParameters, nil))
storageclass, err := client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(DatastoreSCName, scParameters, nil, ""))
framework.ExpectNoError(err, fmt.Sprintf("Failed to create storage class with err: %v", err))
defer client.StorageV1().StorageClasses().Delete(storageclass.Name, nil)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/storage/vsphere/vsphere_volume_diskformat.go
Expand Up @@ -108,7 +108,7 @@ func invokeTest(f *framework.Framework, client clientset.Interface, namespace st
scParameters["diskformat"] = diskFormat

ginkgo.By("Creating Storage Class With DiskFormat")
storageClassSpec := getVSphereStorageClassSpec("thinsc", scParameters, nil)
storageClassSpec := getVSphereStorageClassSpec("thinsc", scParameters, nil, "")
storageclass, err := client.StorageV1().StorageClasses().Create(storageClassSpec)
framework.ExpectNoError(err)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/storage/vsphere/vsphere_volume_disksize.go
Expand Up @@ -66,7 +66,7 @@ var _ = utils.SIGDescribe("Volume Disk Size [Feature:vsphere]", func() {
expectedDiskSize := "1Mi"

ginkgo.By("Creating Storage Class")
storageclass, err := client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(DiskSizeSCName, scParameters, nil))
storageclass, err := client.StorageV1().StorageClasses().Create(getVSphereStorageClassSpec(DiskSizeSCName, scParameters, nil, ""))
framework.ExpectNoError(err)
defer client.StorageV1().StorageClasses().Delete(storageclass.Name, nil)

Expand Down