Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move Azure caches to a separate package #88259

Merged
merged 2 commits into from Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions staging/src/k8s.io/legacy-cloud-providers/azure/BUILD
Expand Up @@ -12,7 +12,6 @@ go_library(
"azure.go",
"azure_backoff.go",
"azure_blobDiskController.go",
"azure_cache.go",
"azure_config.go",
"azure_controller_common.go",
"azure_controller_standard.go",
Expand Down Expand Up @@ -62,6 +61,7 @@ go_library(
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/diskclient:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/interfaceclient:go_default_library",
Expand Down Expand Up @@ -95,7 +95,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"azure_cache_test.go",
"azure_config_test.go",
"azure_controller_common_test.go",
"azure_controller_standard_test.go",
Expand Down Expand Up @@ -125,6 +124,7 @@ go_test(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library",
Expand All @@ -149,6 +149,7 @@ filegroup(
srcs = [
":package-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/metrics:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs",
Expand Down
9 changes: 5 additions & 4 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure.go
Expand Up @@ -43,6 +43,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/auth"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
azclients "k8s.io/legacy-cloud-providers/azure/clients"
"k8s.io/legacy-cloud-providers/azure/clients/diskclient"
"k8s.io/legacy-cloud-providers/azure/clients/interfaceclient"
Expand Down Expand Up @@ -274,10 +275,10 @@ type Cloud struct {
eventRecorder record.EventRecorder
routeUpdater *delayedRouteUpdater

vmCache *timedCache
lbCache *timedCache
nsgCache *timedCache
rtCache *timedCache
vmCache *azcache.TimedCache
lbCache *azcache.TimedCache
nsgCache *azcache.TimedCache
rtCache *azcache.TimedCache

*BlobDiskController
*ManagedDiskController
Expand Down
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) {
}

// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) {
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt azcache.AzureCacheReadType) (compute.VirtualMachine, error) {
var machine compute.VirtualMachine
var retryErr error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
Expand Down
Expand Up @@ -34,6 +34,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ type controllerCommon struct {
}

// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) {
func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt azcache.AzureCacheReadType) (VMSet, error) {
// 1. vmType is standard, return cloud.vmSet directly.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet, nil
Expand Down Expand Up @@ -155,7 +156,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
}
}

vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe)
vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}

vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe)
vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil {
return err
}
Expand Down Expand Up @@ -239,7 +240,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
}

// getNodeDataDisks invokes vmSet interfaces to get data disks for the node.
func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
vmset, err := c.getNodeVMSet(nodeName, crt)
if err != nil {
return nil, err
Expand All @@ -252,7 +253,7 @@ func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheRe
func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
// getNodeDataDisks need to fetch the cached data/fresh data if cache expired here
// to ensure we get LUN based on latest entry.
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault)
disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err
Expand All @@ -276,7 +277,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N

// GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used.
func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault)
disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err
Expand Down Expand Up @@ -307,7 +308,7 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N
// for every reconcile call. The cache is invalidated after Attach/Detach
// disk. So the new entry will be fetched and cached the first time reconcile
// loop runs after the Attach/Disk OP which will reflect the latest model.
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeUnsafe)
disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach
Expand Down
Expand Up @@ -26,12 +26,13 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
)

// AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun.
func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error {
vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault)
vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault)
vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault)
if err != nil {
// if host doesn't exist, no need to detach
klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI)
Expand Down Expand Up @@ -172,7 +173,7 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N
}

// GetDataDisks gets a list of data disks attached to the node.
func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
vm, err := as.getVirtualMachine(nodeName, crt)
if err != nil {
return nil, err
Expand Down
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/stretchr/testify/assert"

"k8s.io/apimachinery/pkg/types"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
)

var (
fakeCacheTTL = 2 * time.Second
)

func TestStandardAttachDisk(t *testing.T) {
Expand Down Expand Up @@ -100,14 +105,14 @@ func TestGetDataDisks(t *testing.T) {
nodeName types.NodeName
expectedDataDisks []compute.DataDisk
expectedError bool
crt cacheReadType
crt azcache.AzureCacheReadType
}{
{
desc: "an error shall be returned if there's no corresponding vm",
nodeName: "vm2",
expectedDataDisks: nil,
expectedError: true,
crt: cacheReadTypeDefault,
crt: azcache.CacheReadTypeDefault,
},
{
desc: "correct list of data disks shall be returned if everything is good",
Expand All @@ -119,7 +124,7 @@ func TestGetDataDisks(t *testing.T) {
},
},
expectedError: false,
crt: cacheReadTypeDefault,
crt: azcache.CacheReadTypeDefault,
},
{
desc: "correct list of data disks shall be returned if everything is good",
Expand All @@ -131,7 +136,7 @@ func TestGetDataDisks(t *testing.T) {
},
},
expectedError: false,
crt: cacheReadTypeUnsafe,
crt: azcache.CacheReadTypeUnsafe,
},
}
for i, test := range testCases {
Expand All @@ -143,7 +148,7 @@ func TestGetDataDisks(t *testing.T) {
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)

if test.crt == cacheReadTypeUnsafe {
if test.crt == azcache.CacheReadTypeUnsafe {
time.Sleep(fakeCacheTTL)
dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt)
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)
Expand Down
Expand Up @@ -26,13 +26,14 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
)

// AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun.
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error {
vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -120,7 +121,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
// the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
}

// GetDataDisks gets a list of data disks attached to the node.
func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
_, _, vm, err := ss.getVmssVM(string(nodeName), crt)
if err != nil {
return nil, err
Expand Down
Expand Up @@ -35,6 +35,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry"
)

Expand Down Expand Up @@ -983,7 +984,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
return fmt.Errorf("unimplemented")
}

func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
return nil, fmt.Errorf("unimplemented")
}

Expand Down
Expand Up @@ -24,6 +24,8 @@ import (
"io/ioutil"
"net/http"
"time"

azcache "k8s.io/legacy-cloud-providers/azure/cache"
)

const (
Expand Down Expand Up @@ -87,7 +89,7 @@ type InstanceMetadata struct {
// InstanceMetadataService knows how to query the Azure instance metadata server.
type InstanceMetadataService struct {
metadataURL string
imsCache *timedCache
imsCache *azcache.TimedCache
}

// NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object.
Expand All @@ -96,7 +98,7 @@ func NewInstanceMetadataService(metadataURL string) (*InstanceMetadataService, e
metadataURL: metadataURL,
}

imsCache, err := newTimedcache(metadataCacheTTL, ims.getInstanceMetadata)
imsCache, err := azcache.NewTimedcache(metadataCacheTTL, ims.getInstanceMetadata)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,7 +147,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{}

// GetMetadata gets instance metadata from cache.
// crt determines if we can get data from stalled cache/need fresh if cache expired.
func (ims *InstanceMetadataService) GetMetadata(crt cacheReadType) (*InstanceMetadata, error) {
func (ims *InstanceMetadataService) GetMetadata(crt azcache.AzureCacheReadType) (*InstanceMetadata, error) {
cache, err := ims.imsCache.Get(metadataCacheKey, crt)
if err != nil {
return nil, err
Expand Down
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
)

const (
Expand Down Expand Up @@ -73,7 +74,7 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N
}

if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe)
metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,7 +260,7 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
}

if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe)
metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -346,7 +347,7 @@ func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string,
}

if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe)
metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil {
return "", err
}
Expand Down
Expand Up @@ -35,6 +35,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
servicehelpers "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
utilnet "k8s.io/utils/net"
)

Expand Down Expand Up @@ -961,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,

if isInternal {
// Refresh updated lb which will be used later in other places.
newLB, exist, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault)
newLB, exist, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault)
if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err)
return nil, err
Expand Down Expand Up @@ -1125,7 +1126,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
ports = []v1.ServicePort{}
}

sg, err := az.getSecurityGroup(cacheReadTypeDefault)
sg, err := az.getSecurityGroup(azcache.CacheReadTypeDefault)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1466,7 +1467,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
}

if lbName != "" {
loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault)
loadBalancer, _, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault)
if err != nil {
return nil, err
}
Expand Down