Skip to content

Commit

Permalink
Merge pull request #57432 from karataliu/azure_vmget_cache
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 49856, 56257, 57027, 57695, 57432). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add cache for VirtualMachinesClient.Get in azure cloud provider

**What this PR does / why we need it**:
Add a timed cache for 'VirtualMachinesClient.Get'

Currently cloud provider will send several get calls to same URL in short period, which is not necessary.

**Which issue(s) this PR fixes**:
Fixes #57031

**Special notes for your reviewer**:

**Release note**:
NONE
  • Loading branch information
Kubernetes Submit Queue committed Jan 2, 2018
2 parents ff9b211 + a8cdeb4 commit f918d18
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 15 deletions.
3 changes: 3 additions & 0 deletions pkg/cloudprovider/providers/azure/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"azure_storage.go",
"azure_storageaccount.go",
"azure_util.go",
"azure_util_cache.go",
"azure_util_vmss.go",
"azure_vmsets.go",
"azure_wrap.go",
Expand Down Expand Up @@ -52,6 +53,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
Expand All @@ -61,6 +63,7 @@ go_test(
srcs = [
"azure_loadbalancer_test.go",
"azure_test.go",
"azure_util_cache_test.go",
"azure_util_test.go",
"azure_wrap_test.go",
],
Expand Down
4 changes: 4 additions & 0 deletions pkg/cloudprovider/providers/azure/azure_controllerCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
}
} else {
glog.V(4).Infof("azureDisk - azure attach succeeded")
// Invalidate the cache right after updating
vmCache.Delete(vmName)
}
return err
}
Expand Down Expand Up @@ -192,6 +194,8 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t
glog.Errorf("azureDisk - azure disk detach failed, err: %v", err)
} else {
glog.V(4).Infof("azureDisk - azure disk detach succeeded")
// Invalidate the cache right after updating
vmCache.Delete(vmName)
}
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cloudprovider/providers/azure/azure_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,15 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error

// GetZoneByNodeName gets zone from instance view.
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView)
vm, exists, err := as.getVirtualMachine(types.NodeName(name))
if err != nil {
return cloudprovider.Zone{}, err
}

if !exists {
return cloudprovider.Zone{}, cloudprovider.InstanceNotFound
}

failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
zone := cloudprovider.Zone{
FailureDomain: failureDomain,
Expand Down
81 changes: 81 additions & 0 deletions pkg/cloudprovider/providers/azure/azure_util_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package azure

import (
"sync"
"time"

"k8s.io/client-go/tools/cache"
)

type timedcacheEntry struct {
key string
data interface{}
}

type timedcache struct {
store cache.Store
lock sync.Mutex
}

// ttl time.Duration
func newTimedcache(ttl time.Duration) timedcache {
return timedcache{
store: cache.NewTTLStore(cacheKeyFunc, ttl),
}
}

func cacheKeyFunc(obj interface{}) (string, error) {
return obj.(*timedcacheEntry).key, nil
}

func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (interface{}, error) {
entry, exists, err := t.store.GetByKey(key)
if err != nil {
return nil, err
}
if exists {
return (entry.(*timedcacheEntry)).data, nil
}

t.lock.Lock()
defer t.lock.Unlock()
entry, exists, err = t.store.GetByKey(key)
if err != nil {
return nil, err
}
if exists {
return (entry.(*timedcacheEntry)).data, nil
}

if createFunc == nil {
return nil, nil
}
created := createFunc()
t.store.Add(&timedcacheEntry{
key: key,
data: created,
})
return created, nil
}

func (t *timedcache) Delete(key string) {
_ = t.store.Delete(&timedcacheEntry{
key: key,
})
}
96 changes: 96 additions & 0 deletions pkg/cloudprovider/providers/azure/azure_util_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package azure

import (
"sync/atomic"
"testing"
"time"
)

func TestCacheReturnsSameObject(t *testing.T) {
type cacheTestingStruct struct{}
c := newTimedcache(1 * time.Minute)
o1 := cacheTestingStruct{}
get1, _ := c.GetOrCreate("b1", func() interface{} {
return o1
})
o2 := cacheTestingStruct{}
get2, _ := c.GetOrCreate("b1", func() interface{} {
return o2
})
if get1 != get2 {
t.Error("Get not equal")
}
}

func TestCacheCallsCreateFuncOnce(t *testing.T) {
var callsCount uint32
f1 := func() interface{} {
atomic.AddUint32(&callsCount, 1)
return 1
}
c := newTimedcache(500 * time.Millisecond)
for index := 0; index < 20; index++ {
_, _ = c.GetOrCreate("b1", f1)
}

if callsCount != 1 {
t.Error("Count not match")
}
time.Sleep(500 * time.Millisecond)
c.GetOrCreate("b1", f1)
if callsCount != 2 {
t.Error("Count not match")
}
}

func TestCacheExpires(t *testing.T) {
f1 := func() interface{} {
return 1
}
c := newTimedcache(500 * time.Millisecond)
get1, _ := c.GetOrCreate("b1", f1)
if get1 != 1 {
t.Error("Value not equal")
}
time.Sleep(500 * time.Millisecond)
get1, _ = c.GetOrCreate("b1", nil)
if get1 != nil {
t.Error("value not expired")
}
}

func TestCacheDelete(t *testing.T) {
f1 := func() interface{} {
return 1
}
c := newTimedcache(500 * time.Millisecond)
get1, _ := c.GetOrCreate("b1", f1)
if get1 != 1 {
t.Error("Value not equal")
}
get1, _ = c.GetOrCreate("b1", nil)
if get1 != 1 {
t.Error("Value not equal")
}
c.Delete("b1")
get1, _ = c.GetOrCreate("b1", nil)
if get1 != nil {
t.Error("value not deleted")
}
}
70 changes: 56 additions & 14 deletions pkg/cloudprovider/providers/azure/azure_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package azure

import (
"net/http"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
Expand Down Expand Up @@ -57,25 +59,65 @@ func ignoreStatusNotFoundFromError(err error) error {
return err
}

// cache used by getVirtualMachine
// 15s for expiration duration
var vmCache = newTimedcache(15 * time.Second)

type vmRequest struct {
lock *sync.Mutex
vm *compute.VirtualMachine
}

/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache
/// The service side has throttling control that delays responses if there're multiple requests onto certain vm
/// resource request in short period.
func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, exists bool, err error) {
var realErr error

vmName := string(nodeName)
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName)
vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "")
glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName)

exists, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return vm, false, realErr
}

if !exists {
return vm, false, nil
}

return vm, exists, err
cachedRequest, err := vmCache.GetOrCreate(vmName, func() interface{} {
return &vmRequest{
lock: &sync.Mutex{},
vm: nil,
}
})
if err != nil {
return compute.VirtualMachine{}, false, err
}
request := cachedRequest.(*vmRequest)

if request.vm == nil {
request.lock.Lock()
defer request.lock.Unlock()
if request.vm == nil {
// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView
// request. If we first send an InstanceView request and then a non InstanceView request, the second
// request will still hit throttling. This is what happens now for cloud controller manager: In this
// case we do get instance view every time to fulfill the azure_zones requirement without hitting
// throttling.
// Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName)
vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, compute.InstanceView)
glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName)

exists, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return vm, false, realErr
}

if !exists {
return vm, false, nil
}

request.vm = &vm
}
return vm, exists, err
}

glog.V(6).Infof("getVirtualMachine hits cache for(%s)", vmName)
return *request.vm, true, nil
}

func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) {
Expand Down

0 comments on commit f918d18

Please sign in to comment.