Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce conflict retries #25091

Merged
merged 1 commit into from
May 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 25 additions & 1 deletion pkg/storage/etcd/api_object_versioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,28 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
}

// APIObjectVersioner implements Versioner
var _ storage.Versioner = APIObjectVersioner{}
var Versioner storage.Versioner = APIObjectVersioner{}

// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case?

added

lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
if err != nil {
// coder error
panic(err)
}
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
if err != nil {
// coder error
panic(err)
}

if lhsVersion == rhsVersion {
return 0
}
if lhsVersion < rhsVersion {
return -1
}

return 1
}
17 changes: 17 additions & 0 deletions pkg/storage/etcd/api_object_versioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,20 @@ func TestObjectVersioner(t *testing.T) {
t.Errorf("unexpected resource version: %#v", obj)
}
}

func TestCompareResourceVersion(t *testing.T) {
five := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}}
six := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "6"}}

versioner := APIObjectVersioner{}

if e, a := -1, versioner.CompareResourceVersion(five, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 1, versioner.CompareResourceVersion(six, five); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 0, versioner.CompareResourceVersion(six, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
}
46 changes: 44 additions & 2 deletions plugin/pkg/admission/resourcequota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
Expand All @@ -52,6 +53,10 @@ type quotaEvaluator struct {
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
updatedQuotas *lru.Cache

// TODO these are used together to bucket items by namespace and then batch them up for processing.
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
Expand Down Expand Up @@ -101,6 +106,10 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
if err != nil {
return nil, err
}
updatedCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
Expand All @@ -118,6 +127,7 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
registry: registry,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
updatedQuotas: updatedCache,

queue: workqueue.New(),
work: map[string][]*admissionWaiter{},
Expand Down Expand Up @@ -247,9 +257,14 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
continue
}

if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
lastErr = err

} else {
// update our cache
e.updateCache(updatedQuota)

}
}

Expand Down Expand Up @@ -472,6 +487,31 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
return ns, []*admissionWaiter{}, false
}

func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
key := quota.Namespace + "/" + quota.Name
e.updatedQuotas.Add(key, quota)
}

var etcdVersioner = etcd.APIObjectVersioner{}

// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)

if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
return quota
}
return cachedQuota
}

func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
Expand Down Expand Up @@ -508,8 +548,10 @@ func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error

resourceQuotas := []api.ResourceQuota{}
for i := range items {
quota := items[i].(*api.ResourceQuota)
quota = e.checkCache(quota)
// always make a copy. We're going to muck around with this and we should never mutate the originals
resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota))
resourceQuotas = append(resourceQuotas, *quota)
}

return resourceQuotas, nil
Expand Down