From 87c58833373b7e85175a25caee12d9bb4a35eaa7 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 23 Jul 2018 14:56:17 +0800 Subject: [PATCH 1/5] Implement GetLabelsForVolume for AzureDisk --- pkg/cloudprovider/providers/azure/azure.go | 3 + .../azure/azure_managedDiskController.go | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index f06faf7d323b..80a54f406ded 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -59,6 +59,9 @@ var ( defaultExcludeMasterFromStandardLB = true ) +// Azure implements PVLabeler. +var _ cloudprovider.PVLabeler = (*Cloud)(nil) + // Config holds the configuration parsed from the --cloud-config flag // All fields are required unless otherwise specified type Config struct { diff --git a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go index 61c6515ef916..22d3d3bff303 100644 --- a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go @@ -17,16 +17,21 @@ limitations under the License. package azure import ( + "context" "fmt" "path" + "strconv" "strings" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage" "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" kwait "k8s.io/apimachinery/pkg/util/wait" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" ) @@ -201,3 +206,58 @@ func getResourceGroupFromDiskURI(diskURI string) (string, error) { } return fields[4], nil } + +// GetLabelsForVolume implements PVLabeler.GetLabelsForVolume +func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) { + // Ignore if not AzureDisk. + if pv.Spec.AzureDisk == nil { + return nil, nil + } + + // Ignore any volumes that are being provisioned + if pv.Spec.AzureDisk.DiskName == volume.ProvisionedVolumeName { + return nil, nil + } + + return c.GetAzureDiskLabels(pv.Spec.AzureDisk.DataDiskURI) +} + +// GetAzureDiskLabels gets availability zone labels for Azuredisk. +func (c *Cloud) GetAzureDiskLabels(diskURI string) (map[string]string, error) { + // Get disk's resource group. + diskName := path.Base(diskURI) + resourceGroup, err := getResourceGroupFromDiskURI(diskURI) + if err != nil { + glog.Errorf("Failed to get resource group for AzureDisk %q: %v", diskName, err) + return nil, err + } + + // Get information of the disk. + ctx, cancel := getContextWithCancel() + defer cancel() + disk, err := c.DisksClient.Get(ctx, resourceGroup, diskName) + if err != nil { + glog.Errorf("Failed to get information for AzureDisk %q: %v", diskName, err) + return nil, err + } + + // Check whether availability zone is specified. + if disk.Zones == nil || len(*disk.Zones) == 0 { + glog.V(4).Infof("Azure disk %q is not zoned", diskName) + return nil, nil + } + + zones := *disk.Zones + zoneID, err := strconv.Atoi(zones[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse zone %v for AzureDisk %v: %v", zones, diskName, err) + } + + zone := c.makeZone(zoneID) + glog.V(4).Infof("Get zone %q for Azure disk %q", zone, diskName) + labels := map[string]string{ + kubeletapis.LabelZoneRegion: c.Location, + kubeletapis.LabelZoneFailureDomain: zone, + } + return labels, nil +} From b258bbad6abb2b7aa273bea6ff9a8eb6b9397ebc Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 23 Jul 2018 15:08:21 +0800 Subject: [PATCH 2/5] Implement PersistentVolumeLabel admission controller for AzureDisk --- .../persistentvolume/label/admission.go | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission.go b/plugin/pkg/admission/storage/persistentvolume/label/admission.go index 09c70df88828..64c48d68b7a3 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission.go @@ -28,6 +28,7 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/features" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" @@ -58,6 +59,7 @@ type persistentVolumeLabel struct { ebsVolumes aws.Volumes cloudConfig []byte gceCloudProvider *gce.GCECloud + azureProvider *azure.Cloud } var _ admission.MutationInterface = &persistentVolumeLabel{} @@ -69,7 +71,7 @@ var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{} // As a side effect, the cloud provider may block invalid or non-existent volumes. func newPersistentVolumeLabel() *persistentVolumeLabel { // DEPRECATED: cloud-controller-manager will now start NewPersistentVolumeLabelController - // which does exactly what this admission controller used to do. So once GCE and AWS can + // which does exactly what this admission controller used to do. So once GCE, AWS and AZURE can // run externally, we can remove this admission controller. glog.Warning("PersistentVolumeLabel admission controller is deprecated. " + "Please remove this controller from your configuration files and scripts.") @@ -123,6 +125,13 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { } volumeLabels = labels } + if volume.Spec.AzureDisk != nil { + labels, err := l.findAzureDiskLabels(volume) + if err != nil { + return admission.NewForbidden(a, fmt.Errorf("error querying AzureDisk volume %s: %v", volume.Spec.AzureDisk.DiskName, err)) + } + volumeLabels = labels + } requirements := make([]api.NodeSelectorRequirement, 0) if len(volumeLabels) != 0 { @@ -272,3 +281,45 @@ func (l *persistentVolumeLabel) getGCECloudProvider() (*gce.GCECloud, error) { } return l.gceCloudProvider, nil } + +// getAzureCloudProvider returns the Azure cloud provider, for use for querying volume labels +func (l *persistentVolumeLabel) getAzureCloudProvider() (*azure.Cloud, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.azureProvider == nil { + var cloudConfigReader io.Reader + if len(l.cloudConfig) > 0 { + cloudConfigReader = bytes.NewReader(l.cloudConfig) + } + cloudProvider, err := cloudprovider.GetCloudProvider("azure", cloudConfigReader) + if err != nil || cloudProvider == nil { + return nil, err + } + azureProvider, ok := cloudProvider.(*azure.Cloud) + if !ok { + // GetCloudProvider has gone very wrong + return nil, fmt.Errorf("error retrieving GCE cloud provider") + } + l.azureProvider = azureProvider + } + + return l.azureProvider, nil +} + +func (l *persistentVolumeLabel) findAzureDiskLabels(volume *api.PersistentVolume) (map[string]string, error) { + // Ignore any volumes that are being provisioned + if volume.Spec.AzureDisk.DiskName == vol.ProvisionedVolumeName { + return nil, nil + } + + provider, err := l.getAzureCloudProvider() + if err != nil { + return nil, err + } + if provider == nil { + return nil, fmt.Errorf("unable to build Azure cloud provider for AzureDisk") + } + + return provider.GetAzureDiskLabels(volume.Spec.AzureDisk.DataDiskURI) +} From 74813d0d26c5253681f893a2ee3edf73dc0ce057 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 23 Jul 2018 16:38:47 +0800 Subject: [PATCH 3/5] Add availability zone support for dynamic provisioning Azure managed disks --- .../azure/azure_blobDiskController.go | 1 + .../azure/azure_managedDiskController.go | 70 ++++++++++++++---- .../providers/azure/azure_zones.go | 14 ++++ pkg/volume/azure_dd/azure_dd.go | 5 +- pkg/volume/azure_dd/azure_provision.go | 74 ++++++++++++++++++- 5 files changed, 144 insertions(+), 20 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure_blobDiskController.go b/pkg/cloudprovider/providers/azure/azure_blobDiskController.go index d0365936da54..2343d76230b3 100644 --- a/pkg/cloudprovider/providers/azure/azure_blobDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_blobDiskController.go @@ -32,6 +32,7 @@ import ( "github.com/Azure/go-autorest/autorest/to" "github.com/golang/glog" "github.com/rubiojr/go-vhd/vhd" + kwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/volume" ) diff --git a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go index 22d3d3bff303..8fa4515c4931 100644 --- a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go @@ -40,22 +40,55 @@ type ManagedDiskController struct { common *controllerCommon } +// ManagedDiskOptions specifies the options of managed disks. +type ManagedDiskOptions struct { + DiskName string + SizeGB int + PVCName string + ResourceGroup string + Zoned bool + ZonePresent bool + ZonesPresent bool + AvailabilityZone string + AvailabilityZones string + Tags map[string]string + StorageAccountType storage.SkuName +} + func newManagedDiskController(common *controllerCommon) (*ManagedDiskController, error) { return &ManagedDiskController{common: common}, nil } //CreateManagedDisk : create managed disk -func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccountType storage.SkuName, resourceGroup string, - sizeGB int, tags map[string]string) (string, error) { - glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", diskName, storageAccountType, sizeGB) +func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) { + glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB) + + // Validate and choose availability zone for creating disk. + var createAZ string + if options.Zoned && !options.ZonePresent && !options.ZonesPresent { + // TODO: get zones from active zones that with running nodes. + } + if !options.ZonePresent && options.ZonesPresent { + // Choose zone from specified zones. + if adminSetOfZones, err := util.ZonesToSet(options.AvailabilityZones); err != nil { + return "", err + } else { + createAZ = util.ChooseZoneForVolume(adminSetOfZones, options.PVCName) + } + } + if options.ZonePresent && !options.ZonesPresent { + if err := util.ValidateZone(options.AvailabilityZone); err != nil { + return "", err + } + createAZ = options.AvailabilityZone + } + // insert original tags to newTags newTags := make(map[string]*string) azureDDTag := "kubernetes-azure-dd" newTags["created-by"] = &azureDDTag - - // insert original tags to newTags - if tags != nil { - for k, v := range tags { + if options.Tags != nil { + for k, v := range options.Tags { // Azure won't allow / (forward slash) in tags newKey := strings.Replace(k, "/", "-", -1) newValue := strings.Replace(v, "/", "-", -1) @@ -63,25 +96,30 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun } } - diskSizeGB := int32(sizeGB) + diskSizeGB := int32(options.SizeGB) model := compute.Disk{ Location: &c.common.location, Tags: newTags, Sku: &compute.DiskSku{ - Name: compute.StorageAccountTypes(storageAccountType), + Name: compute.StorageAccountTypes(options.StorageAccountType), }, DiskProperties: &compute.DiskProperties{ DiskSizeGB: &diskSizeGB, CreationData: &compute.CreationData{CreateOption: compute.Empty}, - }} + }, + } - if resourceGroup == "" { - resourceGroup = c.common.resourceGroup + if options.ResourceGroup == "" { + options.ResourceGroup = c.common.resourceGroup + } + if createAZ != "" { + createZones := []string{createAZ} + model.Zones = &createZones } ctx, cancel := getContextWithCancel() defer cancel() - _, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, resourceGroup, diskName, model) + _, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model) if err != nil { return "", err } @@ -89,7 +127,7 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun diskID := "" err = kwait.ExponentialBackoff(defaultBackOff, func() (bool, error) { - provisionState, id, err := c.getDisk(resourceGroup, diskName) + provisionState, id, err := c.getDisk(options.ResourceGroup, options.DiskName) diskID = id // We are waiting for provisioningState==Succeeded // We don't want to hand-off managed disks to k8s while they are @@ -104,9 +142,9 @@ func (c *ManagedDiskController) CreateManagedDisk(diskName string, storageAccoun }) if err != nil { - glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v but was unable to confirm provisioningState in poll process", diskName, storageAccountType, sizeGB) + glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v but was unable to confirm provisioningState in poll process", options.DiskName, options.StorageAccountType, options.SizeGB) } else { - glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v", diskName, storageAccountType, sizeGB) + glog.V(2).Infof("azureDisk - created new MD Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB) } return diskID, nil diff --git a/pkg/cloudprovider/providers/azure/azure_zones.go b/pkg/cloudprovider/providers/azure/azure_zones.go index 29a152519659..e255bf8bae94 100644 --- a/pkg/cloudprovider/providers/azure/azure_zones.go +++ b/pkg/cloudprovider/providers/azure/azure_zones.go @@ -41,6 +41,20 @@ func (az *Cloud) makeZone(zoneID int) string { return fmt.Sprintf("%s-%d", strings.ToLower(az.Location), zoneID) } +// isAvailabilityZone returns true if the zone is in format of -. +func (az *Cloud) isAvailabilityZone(zone string) bool { + return strings.HasPrefix(zone, fmt.Sprintf("%s-", az.Location)) +} + +// GetZoneID returns the ID of zone from node's zone label. +func (az *Cloud) GetZoneID(zoneLabel string) string { + if !az.isAvailabilityZone(zoneLabel) { + return "" + } + + return strings.TrimPrefix(zoneLabel, fmt.Sprintf("%s-", az.Location)) +} + // GetZone returns the Zone containing the current availability zone and locality region that the program is running in. // If the node is not running with availability zones, then it will fall back to fault domain. func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) { diff --git a/pkg/volume/azure_dd/azure_dd.go b/pkg/volume/azure_dd/azure_dd.go index 05e2f4fc46ff..16669961b1c1 100644 --- a/pkg/volume/azure_dd/azure_dd.go +++ b/pkg/volume/azure_dd/azure_dd.go @@ -35,7 +35,7 @@ type DiskController interface { CreateBlobDisk(dataDiskName string, storageAccountType storage.SkuName, sizeGB int) (string, error) DeleteBlobDisk(diskUri string) error - CreateManagedDisk(diskName string, storageAccountType storage.SkuName, resourceGroup string, sizeGB int, tags map[string]string) (string, error) + CreateManagedDisk(options *azure.ManagedDiskOptions) (string, error) DeleteManagedDisk(diskURI string) error // Attaches the disk to the host machine. @@ -58,6 +58,9 @@ type DiskController interface { // Expand the disk to new size ResizeDisk(diskURI string, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error) + + // GetAzureDiskLabels gets availability zone labels for Azuredisk. + GetAzureDiskLabels(diskURI string) (map[string]string, error) } type azureDataDiskPlugin struct { diff --git a/pkg/volume/azure_dd/azure_provision.go b/pkg/volume/azure_dd/azure_provision.go index bca3e0365b4e..9d1f8db3888a 100644 --- a/pkg/volume/azure_dd/azure_provision.go +++ b/pkg/volume/azure_dd/azure_provision.go @@ -19,12 +19,14 @@ package azure_dd import ( "errors" "fmt" + "strconv" "strings" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -68,6 +70,21 @@ func (d *azureDiskDeleter) Delete() error { return diskController.DeleteBlobDisk(volumeSource.DataDiskURI) } +// parseZoned parsed 'zoned' for storage class. If zoned is not specified (empty string), +// then it defaults to true for managed disks. +func parseZoned(zonedString string, kind v1.AzureDataDiskKind) (bool, error) { + if zonedString == "" { + return kind == v1.AzureManagedDisk, nil + } + + zoned, err := strconv.ParseBool(zonedString) + if err != nil { + return false, fmt.Errorf("failed to parse 'zoned': %v", err) + } + + return zoned, nil +} + func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) { if !util.AccessModesContainedInAll(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) { return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes()) @@ -85,7 +102,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie if len(p.options.PVC.Spec.AccessModes) == 1 { if p.options.PVC.Spec.AccessModes[0] != supportedModes[0] { - return nil, fmt.Errorf("AzureDisk - mode %s is not supporetd by AzureDisk plugin supported mode is %s", p.options.PVC.Spec.AccessModes[0], supportedModes) + return nil, fmt.Errorf("AzureDisk - mode %s is not supported by AzureDisk plugin (supported mode is %s)", p.options.PVC.Spec.AccessModes[0], supportedModes) } } @@ -96,6 +113,13 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie strKind string err error resourceGroup string + + zoned bool + zonePresent bool + zonesPresent bool + strZoned string + availabilityZone string + availabilityZones string ) // maxLength = 79 - (4 for ".vhd") = 75 name := util.GenerateVolumeName(p.options.ClusterName, p.options.PVName, 75) @@ -123,6 +147,14 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie fsType = strings.ToLower(v) case "resourcegroup": resourceGroup = v + case "zone": + zonePresent = true + availabilityZone = v + case "zones": + zonesPresent = true + availabilityZones = v + case "zoned": + strZoned = v default: return nil, fmt.Errorf("AzureDisk - invalid option %s in storage class", k) } @@ -139,6 +171,19 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie return nil, err } + zoned, err = parseZoned(strZoned, kind) + if err != nil { + return nil, err + } + + if !zoned && (zonePresent || zonesPresent) { + return nil, fmt.Errorf("zone or zones StorageClass parameters must be used together with zoned parameter") + } + + if zonePresent && zonesPresent { + return nil, fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time") + } + if cachingMode, err = normalizeCachingMode(cachingMode); err != nil { return nil, err } @@ -154,16 +199,39 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie // create disk diskURI := "" + labels := map[string]string{} if kind == v1.AzureManagedDisk { tags := make(map[string]string) if p.options.CloudTags != nil { tags = *(p.options.CloudTags) } - diskURI, err = diskController.CreateManagedDisk(name, skuName, resourceGroup, requestGiB, tags) + + volumeOptions := &azure.ManagedDiskOptions{ + DiskName: name, + StorageAccountType: skuName, + ResourceGroup: resourceGroup, + PVCName: p.options.PVC.Name, + SizeGB: requestGiB, + Tags: tags, + Zoned: zoned, + ZonePresent: zonePresent, + ZonesPresent: zonesPresent, + AvailabilityZone: availabilityZone, + AvailabilityZones: availabilityZones, + } + diskURI, err = diskController.CreateManagedDisk(volumeOptions) + if err != nil { + return nil, err + } + labels, err = diskController.GetAzureDiskLabels(diskURI) if err != nil { return nil, err } } else { + if zoned { + return nil, errors.New("zoned parameter is only supported for managed disks") + } + if kind == v1.AzureDedicatedBlobDisk { _, diskURI, _, err = diskController.CreateVolume(name, account, storageAccountType, location, requestGiB) if err != nil { @@ -189,7 +257,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: p.options.PVName, - Labels: map[string]string{}, + Labels: labels, Annotations: map[string]string{ "volumehelper.VolumeDynamicallyCreatedByKey": "azure-disk-dynamic-provisioner", }, From 811e831b0a8bd20b847bd01dbe1d3d32654aed8d Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 23 Jul 2018 17:21:31 +0800 Subject: [PATCH 4/5] Chose availability zones from active nodes --- pkg/cloudprovider/providers/azure/azure.go | 98 +++++++++++++++++++ .../azure/azure_managedDiskController.go | 41 +++++--- 2 files changed, 126 insertions(+), 13 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 80a54f406ded..04786b2d3a0f 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -21,13 +21,19 @@ import ( "io" "io/ioutil" "strings" + "sync" "time" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth" "k8s.io/kubernetes/pkg/controller" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/version" "github.com/Azure/go-autorest/autorest" @@ -150,6 +156,13 @@ type Cloud struct { metadata *InstanceMetadata vmSet VMSet + // Lock for access to nodeZones + nodeZonesLock sync.Mutex + // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone + // it is updated by the nodeInformer + nodeZones map[string]sets.String + nodeInformerSynced cache.InformerSynced + // Clients for vmss. VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient @@ -243,6 +256,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az := Cloud{ Config: *config, Environment: *env, + nodeZones: map[string]sets.String{}, DisksClient: newAzDisksClient(azClientConfig), RoutesClient: newAzRoutesClient(azClientConfig), @@ -427,3 +441,87 @@ func initDiskControllers(az *Cloud) error { return nil } + +// SetInformers sets informers for Azure cloud provider. +func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { + glog.Infof("Setting up informers for Azure cloud provider") + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + az.updateNodeZones(nil, node) + }, + UpdateFunc: func(prev, obj interface{}) { + prevNode := prev.(*v1.Node) + newNode := obj.(*v1.Node) + if newNode.Labels[kubeletapis.LabelZoneFailureDomain] == + prevNode.Labels[kubeletapis.LabelZoneFailureDomain] { + return + } + az.updateNodeZones(prevNode, newNode) + }, + DeleteFunc: func(obj interface{}) { + node, isNode := obj.(*v1.Node) + // We can get DeletedFinalStateUnknown instead of *v1.Node here + // and we need to handle that correctly. + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + node, ok = deletedState.Obj.(*v1.Node) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + az.updateNodeZones(node, nil) + }, + }) + az.nodeInformerSynced = nodeInformer.HasSynced +} + +func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { + az.nodeZonesLock.Lock() + defer az.nodeZonesLock.Unlock() + if prevNode != nil { + prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok && az.isAvailabilityZone(prevZone) { + az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name) + if az.nodeZones[prevZone].Len() == 0 { + az.nodeZones[prevZone] = nil + } + } + } + if newNode != nil { + newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok && az.isAvailabilityZone(newZone) { + if az.nodeZones[newZone] == nil { + az.nodeZones[newZone] = sets.NewString() + } + az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name) + } + } +} + +// GetActiveZones returns all the zones in which k8s nodes are currently running. +func (az *Cloud) GetActiveZones() (sets.String, error) { + if az.nodeInformerSynced == nil { + return nil, fmt.Errorf("Azure cloud provider doesn't have informers set") + } + + az.nodeZonesLock.Lock() + defer az.nodeZonesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones") + } + + zones := sets.NewString() + for zone, nodes := range az.nodeZones { + if len(nodes) > 0 { + zones.Insert(zone) + } + } + return zones, nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go index 8fa4515c4931..fd780ffece12 100644 --- a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go @@ -29,6 +29,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" kwait "k8s.io/apimachinery/pkg/util/wait" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/volume" @@ -61,26 +62,33 @@ func newManagedDiskController(common *controllerCommon) (*ManagedDiskController, //CreateManagedDisk : create managed disk func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) { + var zones sets.String + var activeZones sets.String + var err error glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB) + // Get active zones which have nodes running on. + activeZones, err = c.common.cloud.GetActiveZones() + if err != nil { + return "", fmt.Errorf("error querying active zones: %v", err) + } + // Validate and choose availability zone for creating disk. - var createAZ string if options.Zoned && !options.ZonePresent && !options.ZonesPresent { - // TODO: get zones from active zones that with running nodes. - } - if !options.ZonePresent && options.ZonesPresent { + // Neither "zone" or "zones" specified. Pick a zone randomly selected + // from all active zones where Kubernetes cluster has a node. + zones = activeZones + } else if !options.ZonePresent && options.ZonesPresent { // Choose zone from specified zones. - if adminSetOfZones, err := util.ZonesToSet(options.AvailabilityZones); err != nil { + if zones, err = util.ZonesToSet(options.AvailabilityZones); err != nil { return "", err - } else { - createAZ = util.ChooseZoneForVolume(adminSetOfZones, options.PVCName) } - } - if options.ZonePresent && !options.ZonesPresent { + } else if options.ZonePresent && !options.ZonesPresent { if err := util.ValidateZone(options.AvailabilityZone); err != nil { return "", err } - createAZ = options.AvailabilityZone + zones = make(sets.String) + zones.Insert(options.AvailabilityZone) } // insert original tags to newTags @@ -112,14 +120,21 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) ( if options.ResourceGroup == "" { options.ResourceGroup = c.common.resourceGroup } - if createAZ != "" { - createZones := []string{createAZ} + if len(zones.List()) > 0 { + createAZ := util.ChooseZoneForVolume(zones, options.PVCName) + // Do not allow creation of disks in zones that are do not have nodes. Such disks + // are not currently usable. + if !activeZones.Has(createAZ) { + return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ) + } + + createZones := []string{c.common.cloud.GetZoneID(createAZ)} model.Zones = &createZones } ctx, cancel := getContextWithCancel() defer cancel() - _, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model) + _, err = c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model) if err != nil { return "", err } From 6bfd2be2eaee5f1841dbca2242125ae35bd29450 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 24 Jul 2018 14:24:27 +0800 Subject: [PATCH 5/5] Add documentation and unit tests --- pkg/cloudprovider/providers/azure/BUILD | 4 + .../azure/azure_managedDiskController.go | 57 ++++++----- .../providers/azure/azure_test.go | 3 + .../providers/azure/azure_zones_test.go | 73 ++++++++++++++ pkg/volume/azure_dd/BUILD | 2 + pkg/volume/azure_dd/azure_provision.go | 4 + pkg/volume/azure_dd/azure_provision_test.go | 96 +++++++++++++++++++ .../storage/persistentvolume/label/BUILD | 1 + .../persistentvolume/label/admission.go | 2 +- 9 files changed, 219 insertions(+), 23 deletions(-) create mode 100644 pkg/cloudprovider/providers/azure/azure_zones_test.go create mode 100644 pkg/volume/azure_dd/azure_provision_test.go diff --git a/pkg/cloudprovider/providers/azure/BUILD b/pkg/cloudprovider/providers/azure/BUILD index c2e97654b2d0..1b9b3d44825d 100644 --- a/pkg/cloudprovider/providers/azure/BUILD +++ b/pkg/cloudprovider/providers/azure/BUILD @@ -40,6 +40,7 @@ go_library( "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/azure/auth:go_default_library", "//pkg/controller:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/version:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", @@ -50,6 +51,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute:go_default_library", @@ -82,6 +84,7 @@ go_test( "azure_vmss_cache_test.go", "azure_vmss_test.go", "azure_wrap_test.go", + "azure_zones_test.go", ], embed = [":go_default_library"], deps = [ @@ -92,6 +95,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage:go_default_library", diff --git a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go index fd780ffece12..d521d9e750dc 100644 --- a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go @@ -43,16 +43,27 @@ type ManagedDiskController struct { // ManagedDiskOptions specifies the options of managed disks. type ManagedDiskOptions struct { - DiskName string - SizeGB int - PVCName string - ResourceGroup string - Zoned bool - ZonePresent bool - ZonesPresent bool - AvailabilityZone string - AvailabilityZones string - Tags map[string]string + // The name of the disk. + DiskName string + // The size in GB. + SizeGB int + // The name of PVC. + PVCName string + // The name of resource group. + ResourceGroup string + // Wether the disk is zoned. + Zoned bool + // Wether AvailabilityZone is set. + ZonePresent bool + // Wether AvailabilityZones is set. + ZonesPresent bool + // The AvailabilityZone to create the disk. + AvailabilityZone string + // List of AvailabilityZone to create the disk. + AvailabilityZones string + // The tags of the disk. + Tags map[string]string + // The SKU of storage account. StorageAccountType storage.SkuName } @@ -90,6 +101,18 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) ( zones = make(sets.String) zones.Insert(options.AvailabilityZone) } + var createZones *[]string + if len(zones.List()) > 0 { + createAZ := util.ChooseZoneForVolume(zones, options.PVCName) + // Do not allow creation of disks in zones that are do not have nodes. Such disks + // are not currently usable. + if !activeZones.Has(createAZ) { + return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ) + } + + zoneList := []string{c.common.cloud.GetZoneID(createAZ)} + createZones = &zoneList + } // insert original tags to newTags newTags := make(map[string]*string) @@ -108,6 +131,7 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) ( model := compute.Disk{ Location: &c.common.location, Tags: newTags, + Zones: createZones, Sku: &compute.DiskSku{ Name: compute.StorageAccountTypes(options.StorageAccountType), }, @@ -120,17 +144,6 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) ( if options.ResourceGroup == "" { options.ResourceGroup = c.common.resourceGroup } - if len(zones.List()) > 0 { - createAZ := util.ChooseZoneForVolume(zones, options.PVCName) - // Do not allow creation of disks in zones that are do not have nodes. Such disks - // are not currently usable. - if !activeZones.Has(createAZ) { - return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ) - } - - createZones := []string{c.common.cloud.GetZoneID(createAZ)} - model.Zones = &createZones - } ctx, cancel := getContextWithCancel() defer cancel() @@ -307,7 +320,7 @@ func (c *Cloud) GetAzureDiskLabels(diskURI string) (map[string]string, error) { } zone := c.makeZone(zoneID) - glog.V(4).Infof("Get zone %q for Azure disk %q", zone, diskName) + glog.V(4).Infof("Got zone %q for Azure disk %q", zone, diskName) labels := map[string]string{ kubeletapis.LabelZoneRegion: c.Location, kubeletapis.LabelZoneFailureDomain: zone, diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index d1267b9b1977..45ff6ab5b265 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" serviceapi "k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -953,6 +954,8 @@ func getTestCloud() (az *Cloud) { MaximumLoadBalancerRuleCount: 250, VMType: vmTypeStandard, }, + nodeZones: map[string]sets.String{}, + nodeInformerSynced: func() bool { return true }, } az.DisksClient = newFakeDisksClient() az.InterfacesClient = newFakeAzureInterfacesClient() diff --git a/pkg/cloudprovider/providers/azure/azure_zones_test.go b/pkg/cloudprovider/providers/azure/azure_zones_test.go new file mode 100644 index 000000000000..92ac43d93b31 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_zones_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "testing" +) + +func TestIsAvailabilityZone(t *testing.T) { + location := "eastus" + az := &Cloud{ + Config: Config{ + Location: location, + }, + } + tests := []struct { + desc string + zone string + expected bool + }{ + {"empty string should return false", "", false}, + {"wrong farmat should return false", "123", false}, + {"wrong location should return false", "chinanorth-1", false}, + {"correct zone should return true", "eastus-1", true}, + } + + for _, test := range tests { + actual := az.isAvailabilityZone(test.zone) + if actual != test.expected { + t.Errorf("test [%q] get unexpected result: %v != %v", test.desc, actual, test.expected) + } + } +} + +func TestGetZoneID(t *testing.T) { + location := "eastus" + az := &Cloud{ + Config: Config{ + Location: location, + }, + } + tests := []struct { + desc string + zone string + expected string + }{ + {"empty string should return empty string", "", ""}, + {"wrong farmat should return empty string", "123", ""}, + {"wrong location should return empty string", "chinanorth-1", ""}, + {"correct zone should return zone ID", "eastus-1", "1"}, + } + + for _, test := range tests { + actual := az.GetZoneID(test.zone) + if actual != test.expected { + t.Errorf("test [%q] get unexpected result: %q != %q", test.desc, actual, test.expected) + } + } +} diff --git a/pkg/volume/azure_dd/BUILD b/pkg/volume/azure_dd/BUILD index dd0fff9b994e..f14aee0349fc 100644 --- a/pkg/volume/azure_dd/BUILD +++ b/pkg/volume/azure_dd/BUILD @@ -63,6 +63,7 @@ go_test( "azure_common_test.go", "azure_dd_block_test.go", "azure_dd_test.go", + "azure_provision_test.go", ], embed = [":go_default_library"], deps = [ @@ -73,5 +74,6 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", ], ) diff --git a/pkg/volume/azure_dd/azure_provision.go b/pkg/volume/azure_dd/azure_provision.go index 9d1f8db3888a..42ec17e8fa1f 100644 --- a/pkg/volume/azure_dd/azure_provision.go +++ b/pkg/volume/azure_dd/azure_provision.go @@ -82,6 +82,10 @@ func parseZoned(zonedString string, kind v1.AzureDataDiskKind) (bool, error) { return false, fmt.Errorf("failed to parse 'zoned': %v", err) } + if zoned && kind != v1.AzureManagedDisk { + return false, fmt.Errorf("zoned is only supported by managed disks") + } + return zoned, nil } diff --git a/pkg/volume/azure_dd/azure_provision_test.go b/pkg/volume/azure_dd/azure_provision_test.go new file mode 100644 index 000000000000..0854c497ac26 --- /dev/null +++ b/pkg/volume/azure_dd/azure_provision_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure_dd + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" +) + +func TestParseZoned(t *testing.T) { + tests := []struct { + msg string + zoneString string + diskKind v1.AzureDataDiskKind + expected bool + expectError bool + }{ + { + msg: "managed disk should default to zoned", + diskKind: v1.AzureManagedDisk, + expected: true, + }, + { + msg: "shared blob disk should default to un-zoned", + diskKind: v1.AzureSharedBlobDisk, + expected: false, + }, + { + msg: "shared dedicated disk should default to un-zoned", + diskKind: v1.AzureDedicatedBlobDisk, + expected: false, + }, + { + msg: "managed disk should support zoned=true", + diskKind: v1.AzureManagedDisk, + zoneString: "true", + expected: true, + }, + { + msg: "managed disk should support zoned=false", + diskKind: v1.AzureManagedDisk, + zoneString: "false", + expected: false, + }, + { + msg: "shared blob disk should support zoned=false", + diskKind: v1.AzureSharedBlobDisk, + zoneString: "false", + expected: false, + }, + { + msg: "shared blob disk shouldn't support zoned=true", + diskKind: v1.AzureSharedBlobDisk, + zoneString: "true", + expectError: true, + }, + { + msg: "shared dedicated disk should support zoned=false", + diskKind: v1.AzureDedicatedBlobDisk, + zoneString: "false", + expected: false, + }, + { + msg: "dedicated blob disk shouldn't support zoned=true", + diskKind: v1.AzureDedicatedBlobDisk, + zoneString: "true", + expectError: true, + }, + } + + for i, test := range tests { + real, err := parseZoned(test.zoneString, test.diskKind) + if test.expectError { + assert.Error(t, err, fmt.Sprintf("TestCase[%d]: %s", i, test.msg)) + } else { + assert.Equal(t, test.expected, real, fmt.Sprintf("TestCase[%d]: %s", i, test.msg)) + } + } +} diff --git a/plugin/pkg/admission/storage/persistentvolume/label/BUILD b/plugin/pkg/admission/storage/persistentvolume/label/BUILD index 2674d29ebd06..cffd35726ad8 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/BUILD +++ b/plugin/pkg/admission/storage/persistentvolume/label/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library", + "//pkg/cloudprovider/providers/azure:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/features:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", diff --git a/plugin/pkg/admission/storage/persistentvolume/label/admission.go b/plugin/pkg/admission/storage/persistentvolume/label/admission.go index 64c48d68b7a3..4bbd16fee99e 100644 --- a/plugin/pkg/admission/storage/persistentvolume/label/admission.go +++ b/plugin/pkg/admission/storage/persistentvolume/label/admission.go @@ -299,7 +299,7 @@ func (l *persistentVolumeLabel) getAzureCloudProvider() (*azure.Cloud, error) { azureProvider, ok := cloudProvider.(*azure.Cloud) if !ok { // GetCloudProvider has gone very wrong - return nil, fmt.Errorf("error retrieving GCE cloud provider") + return nil, fmt.Errorf("error retrieving Azure cloud provider") } l.azureProvider = azureProvider }