Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cache for speeding up APIServices lookup #79999

Merged
merged 1 commit into from Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -46,6 +46,7 @@ go_test(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library",
Expand Down
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"net/http"
"net/url"
"reflect"
"sync"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -72,6 +74,10 @@ type AvailableConditionController struct {
syncFn func(key string) error

queue workqueue.RateLimitingInterface
// map from service-namespace -> service-name -> apiservice names
cache map[string]map[string][]string
tedyu marked this conversation as resolved.
Show resolved Hide resolved
tedyu marked this conversation as resolved.
Show resolved Hide resolved
// this lock protects operations on the above cache
cacheLock sync.RWMutex
}

// NewAvailableConditionController returns a new AvailableConditionController.
Expand Down Expand Up @@ -413,26 +419,23 @@ func (c *AvailableConditionController) processNextWorkItem() bool {
return true
}

func (c *AvailableConditionController) enqueue(obj *apiregistrationv1.APIService) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
return
}

c.queue.Add(key)
}

func (c *AvailableConditionController) addAPIService(obj interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Adding %s", castObj.Name)
c.enqueue(castObj)
if castObj.Spec.Service != nil {
c.rebuildAPIServiceCache()
tedyu marked this conversation as resolved.
Show resolved Hide resolved
}
c.queue.Add(castObj.Name)
}

func (c *AvailableConditionController) updateAPIService(obj, _ interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", castObj.Name)
c.enqueue(castObj)
func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) {
castObj := newObj.(*apiregistrationv1.APIService)
oldCastObj := oldObj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", oldCastObj.Name)
if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) {
c.rebuildAPIServiceCache()
}
tedyu marked this conversation as resolved.
Show resolved Hide resolved
c.queue.Add(oldCastObj.Name)
}

func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
Expand All @@ -450,42 +453,55 @@ func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
}
}
klog.V(4).Infof("Deleting %q", castObj.Name)
c.enqueue(castObj)
if castObj.Spec.Service != nil {
c.rebuildAPIServiceCache()
}
c.queue.Add(castObj.Name)
}

// there aren't very many apiservices, just check them all.
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []*apiregistrationv1.APIService {
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
tedyu marked this conversation as resolved.
Show resolved Hide resolved
metadata, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(err)
return nil
}
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
return c.cache[metadata.GetNamespace()][metadata.GetName()]
}

var ret []*apiregistrationv1.APIService
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
// (which will get processed an extra time - this doesn't matter),
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
func (c *AvailableConditionController) rebuildAPIServiceCache() {
tedyu marked this conversation as resolved.
Show resolved Hide resolved
liggitt marked this conversation as resolved.
Show resolved Hide resolved
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
newCache := map[string]map[string][]string{}
for _, apiService := range apiServiceList {
if apiService.Spec.Service == nil {
continue
tedyu marked this conversation as resolved.
Show resolved Hide resolved
}
if apiService.Spec.Service.Namespace == metadata.GetNamespace() && apiService.Spec.Service.Name == metadata.GetName() {
ret = append(ret, apiService)
if newCache[apiService.Spec.Service.Namespace] == nil {
newCache[apiService.Spec.Service.Namespace] = map[string][]string{}
}
newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name)
}

return ret
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cache = newCache
}

// TODO, think of a way to avoid checking on every service manipulation

func (c *AvailableConditionController) addService(obj interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

func (c *AvailableConditionController) updateService(obj, _ interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

Expand All @@ -504,19 +520,19 @@ func (c *AvailableConditionController) deleteService(obj interface{}) {
}
}
for _, apiService := range c.getAPIServicesFor(castObj) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

func (c *AvailableConditionController) addEndpoints(obj interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}

Expand All @@ -535,6 +551,6 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
}
}
for _, apiService := range c.getAPIServicesFor(castObj) {
c.enqueue(apiService)
c.queue.Add(apiService)
}
}
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/davecgh/go-spew/spew"

Expand All @@ -32,6 +33,7 @@ import (
v1listers "k8s.io/client-go/listers/core/v1"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
Expand Down Expand Up @@ -103,6 +105,103 @@ func newRemoteAPIService(name string) *apiregistration.APIService {
}
}

func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableConditionController, *fake.Clientset) {
fakeClient := fake.NewSimpleClientset()
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer testServer.Close()

for _, o := range apiServices {
apiServiceIndexer.Add(o)
}

c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1(),
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
serviceLister: v1listers.NewServiceLister(serviceIndexer),
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
discoveryClient: testServer.Client(),
serviceResolver: &fakeServiceResolver{url: testServer.URL},
queue: workqueue.NewNamedRateLimitingQueue(
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
}
for _, svc := range apiServices {
c.addAPIService(svc)
}
return &c, fakeClient
}

func BenchmarkBuildCache(b *testing.B) {
apiServiceName := "remote.group"
// model 1 APIService pointing at a given service, and 30 pointing at local group/versions
apiServices := []*apiregistration.APIService{newRemoteAPIService(apiServiceName)}
for i := 0; i < 30; i++ {
apiServices = append(apiServices, newLocalAPIService(fmt.Sprintf("local.group%d", i)))
}
// model one service backing an API service, and 100 unrelated services
services := []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}
for i := 0; i < 100; i++ {
services = append(services, newService("foo", fmt.Sprintf("bar%d", i), testServicePort, testServicePortName))
}
c, _ := setupAPIServices(apiServices)
b.ReportAllocs()
b.ResetTimer()
for n := 1; n <= b.N; n++ {
for _, svc := range services {
c.addService(svc)
}
for _, svc := range services {
c.updateService(svc, svc)
}
for _, svc := range services {
c.deleteService(svc)
}
}
}

func TestBuildCache(t *testing.T) {
tests := []struct {
name string

apiServiceName string
apiServices []*apiregistration.APIService
services []*v1.Service
endpoints []*v1.Endpoints

expectedAvailability apiregistration.APIServiceCondition
}{
{
name: "api service",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c, fakeClient := setupAPIServices(tc.apiServices)
for _, svc := range tc.services {
c.addService(svc)
}

c.sync(tc.apiServiceName)

// ought to have one action writing status
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
}
})
}
}
func TestSync(t *testing.T) {
tests := []struct {
name string
Expand Down