Skip to content

Commit

Permalink
Merge pull request #72731 from skarthiksrinivas/vsphere_volume_zone
Browse files Browse the repository at this point in the history
Provision vsphere volume as per zone
  • Loading branch information
k8s-ci-robot committed Feb 18, 2019
2 parents 92e0c23 + a309d8a commit 701f914
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/cloudprovider/providers/vsphere/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/github.com/vmware/govmomi/object:go_default_library",
"//vendor/github.com/vmware/govmomi/property:go_default_library",
"//vendor/github.com/vmware/govmomi/vapi/rest:go_default_library",
"//vendor/github.com/vmware/govmomi/vapi/tags:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25:go_default_library",
Expand Down
38 changes: 36 additions & 2 deletions pkg/cloudprovider/providers/vsphere/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"strings"
"sync"

"github.com/vmware/govmomi/object"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
)
Expand All @@ -34,6 +36,7 @@ type NodeInfo struct {
vm *vclib.VirtualMachine
vcServer string
vmUUID string
zone *cloudprovider.Zone
}

type NodeManager struct {
Expand All @@ -58,6 +61,7 @@ type NodeDetails struct {
NodeName string
vm *vclib.VirtualMachine
VMUUID string
Zone *cloudprovider.Zone
}

// TODO: Make it configurable in vsphere.conf
Expand Down Expand Up @@ -190,7 +194,11 @@ func (nm *NodeManager) DiscoverNode(node *v1.Node) error {
klog.V(4).Infof("Found node %s as vm=%+v in vc=%s and datacenter=%s",
node.Name, vm, res.vc, res.datacenter.Name())

nodeInfo := &NodeInfo{dataCenter: res.datacenter, vm: vm, vcServer: res.vc, vmUUID: nodeUUID}
// Get the node zone information
nodeFd := node.ObjectMeta.Labels[v1.LabelZoneFailureDomain]
nodeRegion := node.ObjectMeta.Labels[v1.LabelZoneRegion]
nodeZone := &cloudprovider.Zone{FailureDomain: nodeFd, Region: nodeRegion}
nodeInfo := &NodeInfo{dataCenter: res.datacenter, vm: vm, vcServer: res.vc, vmUUID: nodeUUID, zone: nodeZone}
nm.addNodeInfo(node.ObjectMeta.Name, nodeInfo)
for range queueChannel {
}
Expand Down Expand Up @@ -309,7 +317,7 @@ func (nm *NodeManager) GetNodeDetails() ([]NodeDetails, error) {
return nil, err
}
klog.V(4).Infof("Updated NodeInfo %v for node %q.", nodeInfo, nodeName)
nodeDetails = append(nodeDetails, NodeDetails{nodeName, nodeInfo.vm, nodeInfo.vmUUID})
nodeDetails = append(nodeDetails, NodeDetails{nodeName, nodeInfo.vm, nodeInfo.vmUUID, nodeInfo.zone})
}
return nodeDetails, nil
}
Expand Down Expand Up @@ -355,6 +363,7 @@ func (nm *NodeManager) renewNodeInfo(nodeInfo *NodeInfo, reconnect bool) (*NodeI
dataCenter: vm.Datacenter,
vcServer: nodeInfo.vcServer,
vmUUID: nodeInfo.vmUUID,
zone: nodeInfo.zone,
}, nil
}

Expand Down Expand Up @@ -442,3 +451,28 @@ func (nm *NodeManager) UpdateCredentialManager(credentialManager *SecretCredenti
defer nm.credentialManagerLock.Unlock()
nm.credentialManager = credentialManager
}

func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain string) ([]*object.HostSystem, error) {
klog.V(9).Infof("GetHostsInZone called with registeredNodes: %v", nm.registeredNodes)
nodeDetails, err := nm.GetNodeDetails()
if err != nil {
return nil, err
}
klog.V(4).Infof("Node Details: %v", nodeDetails)
// Return those hosts that are in the given zone.
hosts := make([]*object.HostSystem, 0)
for _, n := range nodeDetails {
// Match the provided zone failure domain with the node.
klog.V(9).Infof("Matching provided zone %s with node %s zone %s", zoneFailureDomain, n.NodeName, n.Zone.FailureDomain)
if zoneFailureDomain == n.Zone.FailureDomain {
host, err := n.vm.HostSystem(ctx)
if err != nil {
klog.Errorf("Failed to get host system for VM %s. err: %+v", n.vm, err)
continue
}
hosts = append(hosts, host)
}
}
klog.V(4).Infof("GetHostsInZone %v returning: %v", zoneFailureDomain, hosts)
return hosts, nil
}
1 change: 1 addition & 0 deletions pkg/cloudprovider/providers/vsphere/vclib/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type VolumeOptions struct {
StoragePolicyName string
StoragePolicyID string
SCSIControllerType string
Zone []string
}

var (
Expand Down
116 changes: 99 additions & 17 deletions pkg/cloudprovider/providers/vsphere/vsphere.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
klog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions)
createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
var datastore string
var dsList []*vclib.DatastoreInfo
// If datastore not specified, then use default datastore
if volumeOptions.Datastore == "" {
datastore = vs.cfg.Workspace.DefaultDatastore
Expand All @@ -1184,6 +1185,28 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
}
var vmOptions *vclib.VMOptions
if volumeOptions.VSANStorageProfileData != "" || volumeOptions.StoragePolicyName != "" {
// If datastore and zone are specified, first validate if the datastore is in the provided zone.
if len(volumeOptions.Zone) != 0 && volumeOptions.Datastore != "" {
klog.V(4).Infof("Specified zone : %s, datastore : %s", volumeOptions.Zone, volumeOptions.Datastore)
dsList, err = getDatastoresForZone(ctx, dc, vs.nodeManager, volumeOptions.Zone)
if err != nil {
return "", err
}

// Validate if the datastore provided belongs to the zone. If not, fail the operation.
found := false
for _, ds := range dsList {
if ds.Info.Name == volumeOptions.Datastore {
found = true
break
}
}
if !found {
err := fmt.Errorf("The specified datastore %s does not match the provided zones : %s", volumeOptions.Datastore, volumeOptions.Zone)
klog.Error(err)
return "", err
}
}
// Acquire a read lock to ensure multiple PVC requests can be processed simultaneously.
cleanUpDummyVMLock.RLock()
defer cleanUpDummyVMLock.RUnlock()
Expand All @@ -1203,29 +1226,88 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo
}
}
if volumeOptions.StoragePolicyName != "" && volumeOptions.Datastore == "" {
datastore, err = getPbmCompatibleDatastore(ctx, dc, volumeOptions.StoragePolicyName, vs.nodeManager)
if len(volumeOptions.Zone) == 0 {
klog.V(4).Infof("Selecting a shared datastore as per the storage policy %s", volumeOptions.StoragePolicyName)
datastore, err = getPbmCompatibleDatastore(ctx, dc, volumeOptions.StoragePolicyName, vs.nodeManager)
} else {
// If zone is specified, first get the datastores in the zone.
dsList, err = getDatastoresForZone(ctx, dc, vs.nodeManager, volumeOptions.Zone)

// If unable to get any datastore, fail the operation.
if len(dsList) == 0 {
err := fmt.Errorf("Failed to find a shared datastore matching zone %s", volumeOptions.Zone)
klog.Error(err)
return "", err
}

klog.V(4).Infof("Specified zone : %s. Picking a datastore as per the storage policy %s among the zoned datastores : %s", volumeOptions.Zone,
volumeOptions.StoragePolicyName, dsList)
// Among the compatible datastores, select the one based on the maximum free space.
datastore, err = getPbmCompatibleZonedDatastore(ctx, dc, volumeOptions.StoragePolicyName, dsList)
}
klog.V(1).Infof("Datastore selected as per policy : %s", datastore)
if err != nil {
klog.Errorf("Failed to get pbm compatible datastore with storagePolicy: %s. err: %+v", volumeOptions.StoragePolicyName, err)
return "", err
}
} else {
// Since no storage policy is specified but datastore is specified, check
// if the given datastore is a shared datastore across all node VMs.
sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, dc, vs.nodeManager)
if err != nil {
klog.Errorf("Failed to get shared datastore: %+v", err)
return "", err
}
found := false
for _, sharedDs := range sharedDsList {
if datastore == sharedDs.Info.Name {
found = true
break
// If zone is specified, pick the datastore in the zone with maximum free space within the zone.
if volumeOptions.Datastore == "" && len(volumeOptions.Zone) != 0 {
klog.V(4).Infof("Specified zone : %s", volumeOptions.Zone)
dsList, err = getDatastoresForZone(ctx, dc, vs.nodeManager, volumeOptions.Zone)

// If unable to get any datastore, fail the operation
if len(dsList) == 0 {
err := fmt.Errorf("Failed to find a shared datastore matching zone %s", volumeOptions.Zone)
klog.Error(err)
return "", err
}

if err != nil {
return "", err
}
datastore, err = getMostFreeDatastoreName(ctx, nil, dsList)
if err != nil {
klog.Errorf("Failed to get shared datastore: %+v", err)
return "", err
}
klog.V(1).Infof("Specified zone : %s. Selected datastore : %s", volumeOptions.StoragePolicyName, datastore)
} else {
var sharedDsList []*vclib.DatastoreInfo
var err error
if len(volumeOptions.Zone) == 0 {
// If zone is not provided, get the shared datastore across all node VMs.
klog.V(4).Infof("Validating if datastore %s is shared across all node VMs", datastore)
sharedDsList, err = getSharedDatastoresInK8SCluster(ctx, dc, vs.nodeManager)
if err != nil {
klog.Errorf("Failed to get shared datastore: %+v", err)
return "", err
}
// Prepare error msg to be used later, if required.
err = fmt.Errorf("The specified datastore %s is not a shared datastore across node VMs", datastore)
} else {
// If zone is provided, get the shared datastores in that zone.
klog.V(4).Infof("Validating if datastore %s is in zone %s ", datastore, volumeOptions.Zone)
sharedDsList, err = getDatastoresForZone(ctx, dc, vs.nodeManager, volumeOptions.Zone)
if err != nil {
return "", err
}
// Prepare error msg to be used later, if required.
err = fmt.Errorf("The specified datastore %s does not match the provided zones : %s", datastore, volumeOptions.Zone)
}
found := false
// Check if the selected datastore belongs to the list of shared datastores computed.
for _, sharedDs := range sharedDsList {
if datastore == sharedDs.Info.Name {
klog.V(4).Infof("Datastore validation succeeded")
found = true
break
}
}
if !found {
klog.Error(err)
return "", err
}
}
if !found {
msg := fmt.Sprintf("The specified datastore %s is not a shared datastore across node VMs", datastore)
return "", errors.New(msg)
}
}
ds, err := dc.GetDatastoreByName(ctx, datastore)
Expand Down
90 changes: 90 additions & 0 deletions pkg/cloudprovider/providers/vsphere/vsphere_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
Expand Down Expand Up @@ -248,6 +250,94 @@ func getPbmCompatibleDatastore(ctx context.Context, dc *vclib.Datacenter, storag
return datastore, err
}

func getDatastoresForZone(ctx context.Context, dc *vclib.Datacenter, nodeManager *NodeManager, selectedZones []string) ([]*vclib.DatastoreInfo, error) {

var sharedDatastores []*vclib.DatastoreInfo

for _, zone := range selectedZones {
var sharedDatastoresPerZone []*vclib.DatastoreInfo
hosts, err := nodeManager.GetHostsInZone(ctx, zone)
if err != nil {
return nil, err
}
klog.V(4).Infof("Hosts in zone %s : %s", zone, hosts)

for _, host := range hosts {
var hostSystemMo mo.HostSystem
host.Properties(ctx, host.Reference(), []string{"datastore"}, &hostSystemMo)

klog.V(4).Infof("Datastores mounted on host %s : %s", host, hostSystemMo.Datastore)
var dsRefList []types.ManagedObjectReference
for _, dsRef := range hostSystemMo.Datastore {
dsRefList = append(dsRefList, dsRef)
}

var dsMoList []mo.Datastore
pc := property.DefaultCollector(host.Client())
properties := []string{DatastoreInfoProperty}
err = pc.Retrieve(ctx, dsRefList, properties, &dsMoList)
if err != nil {
klog.Errorf("Failed to get Datastore managed objects from datastore objects."+
" dsObjList: %+v, properties: %+v, err: %v", dsRefList, properties, err)
return nil, err
}
klog.V(9).Infof("Datastore mo details: %+v", dsMoList)

var dsObjList []*vclib.DatastoreInfo
for _, dsMo := range dsMoList {
dsObjList = append(dsObjList,
&vclib.DatastoreInfo{
Datastore: &vclib.Datastore{Datastore: object.NewDatastore(host.Client(), dsMo.Reference()),
Datacenter: nil},
Info: dsMo.Info.GetDatastoreInfo()})
}

klog.V(9).Infof("DatastoreInfo details : %s", dsObjList)

if len(sharedDatastoresPerZone) == 0 {
sharedDatastoresPerZone = dsObjList
} else {
sharedDatastoresPerZone = intersect(sharedDatastoresPerZone, dsObjList)
}
klog.V(9).Infof("Shared datastore list after processing host %s : %s", host, sharedDatastoresPerZone)
}
klog.V(4).Infof("Shared datastore per zone %s is %s", zone, sharedDatastoresPerZone)
if len(sharedDatastores) == 0 {
sharedDatastores = sharedDatastoresPerZone
} else {
sharedDatastores = intersect(sharedDatastores, sharedDatastoresPerZone)
}
}
klog.V(1).Infof("Returning selected datastores : %s", sharedDatastores)
return sharedDatastores, nil
}

func getPbmCompatibleZonedDatastore(ctx context.Context, dc *vclib.Datacenter, storagePolicyName string, zonedDatastores []*vclib.DatastoreInfo) (string, error) {
pbmClient, err := vclib.NewPbmClient(ctx, dc.Client())
if err != nil {
return "", err
}
storagePolicyID, err := pbmClient.ProfileIDByName(ctx, storagePolicyName)
if err != nil {
klog.Errorf("Failed to get Profile ID by name: %s. err: %+v", storagePolicyName, err)
return "", err
}
compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, dc, storagePolicyID, zonedDatastores)
if err != nil {
klog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v",
zonedDatastores, storagePolicyID, err)
return "", err
}
klog.V(9).Infof("compatibleDatastores : %+v", compatibleDatastores)
datastore, err := getMostFreeDatastoreName(ctx, dc.Client(), compatibleDatastores)
if err != nil {
klog.Errorf("Failed to get most free datastore from compatible datastores: %+v. err: %+v", compatibleDatastores, err)
return "", err
}
klog.V(4).Infof("Most free datastore : %+s", datastore)
return datastore, err
}

func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter, resourcePoolPath string) (*vclib.VMOptions, error) {
var vmOptions vclib.VMOptions
resourcePool, err := dc.GetResourcePool(ctx, resourcePoolPath)
Expand Down
10 changes: 8 additions & 2 deletions pkg/volume/vsphere_volume/vsphere_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str
// Abstract interface to disk operations.
type vdManager interface {
// Creates a volume
CreateVolume(provisioner *vsphereVolumeProvisioner) (volSpec *VolumeSpec, err error)
CreateVolume(provisioner *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error)
// Deletes a volume
DeleteVolume(deleter *vsphereVolumeDeleter) error
}
Expand Down Expand Up @@ -363,8 +363,14 @@ func (v *vsphereVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopol
if !util.AccessModesContainedInAll(v.plugin.GetAccessModes(), v.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", v.options.PVC.Spec.AccessModes, v.plugin.GetAccessModes())
}
klog.V(1).Infof("Provision with allowedTopologies : %s", allowedTopologies)
selectedZones, err := volumehelpers.ZonesFromAllowedTopologies(allowedTopologies)
if err != nil {
return nil, err
}

volSpec, err := v.manager.CreateVolume(v)
klog.V(4).Infof("Selected zones for volume : %s", selectedZones)
volSpec, err := v.manager.CreateVolume(v, selectedZones.List())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/vsphere_volume/vsphere_volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func getFakeDeviceName(host volume.VolumeHost, volPath string) string {
return path.Join(host.GetPluginDir(vsphereVolumePluginName), "device", volPath)
}

func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner) (volSpec *VolumeSpec, err error) {
func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error) {
volSpec = &VolumeSpec{
Path: "[local] test-volume-name.vmdk",
Size: 100,
Expand Down
3 changes: 2 additions & 1 deletion pkg/volume/vsphere_volume/vsphere_volume_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func verifyDevicePath(path string) (string, error) {
}

// CreateVolume creates a vSphere volume.
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec *VolumeSpec, err error) {
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner, selectedZone []string) (volSpec *VolumeSpec, err error) {
var fstype string
cloud, err := getCloudProvider(v.plugin.host.GetCloudProvider())
if err != nil {
Expand All @@ -106,6 +106,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec
Name: name,
}

volumeOptions.Zone = selectedZone
// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
for parameter, value := range v.options.Parameters {
Expand Down

0 comments on commit 701f914

Please sign in to comment.