Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an expiring cache for the caching token authenticator #84424

Merged
merged 2 commits into from Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kubeapiserver/authenticator/config.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package authenticator

import (
"context"
"time"

"github.com/go-openapi/spec"
Expand Down Expand Up @@ -192,7 +193,7 @@ func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, er
tokenAuth := tokenunion.New(tokenAuthenticators...)
// Optionally cache authentication results
if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
tokenAuth = tokencache.New(context.TODO(), tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
}
authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
securityDefinitions["BearerToken"] = &spec.SecurityScheme{
Expand Down Expand Up @@ -312,5 +313,5 @@ func newWebhookTokenAuthenticator(webhookConfigFile string, version string, ttl
return nil, err
}

return tokencache.New(webhookTokenAuthenticator, false, ttl, ttl), nil
return tokencache.New(context.TODO(), webhookTokenAuthenticator, false, ttl, ttl), nil
}
16 changes: 13 additions & 3 deletions staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD
Expand Up @@ -8,20 +8,30 @@ load(

go_test(
name = "go_default_test",
srcs = ["lruexpirecache_test.go"],
srcs = [
"expiring_test.go",
"lruexpirecache_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",
"//vendor/github.com/google/uuid:go_default_library",
],
)

go_library(
name = "go_default_library",
srcs = ["lruexpirecache.go"],
srcs = [
"expiring.go",
"lruexpirecache.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/cache",
importpath = "k8s.io/apimachinery/pkg/util/cache",
deps = ["//vendor/github.com/hashicorp/golang-lru:go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/github.com/hashicorp/golang-lru:go_default_library",
],
)

filegroup(
Expand Down
208 changes: 208 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go
@@ -0,0 +1,208 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"container/heap"
"context"
"sync"
"time"

utilclock "k8s.io/apimachinery/pkg/util/clock"
)

// NewExpiring returns an initialized expiring cache. Users must call
// (*Expiring).Run() to begin the GC goroutine.
func NewExpiring() *Expiring {
return NewExpiringWithClock(utilclock.RealClock{})
}

// NewExpiringWithClock is like NewExpiring but allows passing in a custom
// clock for testing.
func NewExpiringWithClock(clock utilclock.Clock) *Expiring {
return &Expiring{
clock: clock,
cache: make(map[interface{}]entry),
}
}

// Expiring is a map whose entries expire after a per-entry timeout.
type Expiring struct {
clock utilclock.Clock

// mu protects the below fields
mu sync.RWMutex
// cache is the internal map that backs the cache.
cache map[interface{}]entry
// generation is used as a cheap resource version for cache entries. Cleanups
// are scheduled with a key and generation. When the cleanup runs, it first
// compares its generation with the current generation of the entry. It
// deletes the entry iff the generation matches. This prevents cleanups
// scheduled for earlier versions of an entry from deleting later versions of
// an entry when Set() is called multiple times with the same key.
//
// The integer value of the generation of an entry is meaningless.
generation uint64

heap expiringHeap
}

type entry struct {
val interface{}
expiry time.Time
generation uint64
}

// Get looks up an entry in the cache.
func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()
e, ok := c.cache[key]
if !ok || c.clock.Now().After(e.expiry) {
return nil, false
}
return e.val, true
}

// Set sets a key/value/expiry entry in the map, overwriting any previous entry
// with the same key. The entry expires at the given expiry time, but its TTL
// may be lengthened or shortened by additional calls to Set().
func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) {
expiry := c.clock.Now().Add(ttl)

c.mu.Lock()
defer c.mu.Unlock()

c.generation++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I like to capture this sort of thing in a function to make sure an increment is always paired with a use.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g.

func (c *Expiring) nextGenerationLocked() int64 {
  g := c.generation
  c.generation++
  return g
}

In this case it's optional since a) you're actually tracking the last value given rather than the next value to be used (which I hadn't realized when I wrote that comment) and b) this is the only place that needs to consume a generation.


c.cache[key] = entry{
val: val,
expiry: expiry,
generation: c.generation,
}

heap.Push(&c.heap, &expiringHeapEntry{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated sets for the same key will make a lot of entries in this list. We should document that callers shouldn't do that often, or we should fix it (by e.g. having the entry store a pointer to the heap entry).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not so easy with a heap since we need an index to modify then fix. I'd be happy with just documenting for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. You'd have to do this differently. Hm.

key: key,
generation: c.generation,
expiry: expiry,
})
}

// Delete deletes an entry in the map.
func (c *Expiring) Delete(key interface{}) {
mikedanese marked this conversation as resolved.
Show resolved Hide resolved
mikedanese marked this conversation as resolved.
Show resolved Hide resolved
c.mu.Lock()
defer c.mu.Unlock()
c.del(key, 0)
}

// del deletes the entry for the given key. The generation argument is the
// generation of the entry that should be deleted. If the generation has been
// changed (e.g. if a set has occurred on an existing element but the old
// cleanup still runs), this is a noop. If the generation argument is 0, the
// entry's generation is ignored and the entry is deleted.
//
// del must be called under the write lock.
func (c *Expiring) del(key interface{}, generation uint64) {
e, ok := c.cache[key]
if !ok {
return
}
if generation != 0 && generation != e.generation {
return
}
delete(c.cache, key)
}

// Len returns the number of items in the cache.
func (c *Expiring) Len() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cache)
}

const gcInterval = 50 * time.Millisecond
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit often?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be quick enough to push back on cache growth. It's pretty in expensive from what I can tell from the benchmarks. What are your recommendations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it more frequently should reduce how much needs to be cleaned.

The worst aspect of this is that new requests are blocked during this. I guess that's why sharding helps (ideally the shards wouldn't all GC at the same exact time).

If we e.g. produced a copy of the map each time and stored it with something like sync.Value (?), I think we could permit reads to proceed 100% of the time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how sync.Map is implemented but it generates a lot of garbage. You are pushing the work to the (go) GC. The critical improvement to this cache strategy is reduction of cache misses that we saw from insufficient LRU cache size and scanning access patterns. We haven't identified contention as a problem. Benchmarks also show that a GC iteration is cheap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my head I was thinking something like time.Second but without actual measurements it is hard for me to know what kind of value makes sense here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with 50ms; we might want to have a metric for how long the gc periods take, I'm open to any other way of measuring.

Something else that could be considered is to just run a GC iteration whenever there's a set; we've already taken the lock at that point, so we might as well do a bit of work. That would amortize the cost and keep the average size lower when there's a lot of churn. Then we could potentially reduce this frequency by a bunch; when the requests are infrequent, there won't be too much garbage to clean up.


// Run runs the GC goroutine. The goroutine exits when the passed in context is
// cancelled.
func (c *Expiring) Run(ctx context.Context) {
t := c.clock.NewTicker(gcInterval)
defer t.Stop()
for {
select {
case <-t.C():
c.gc()
case <-ctx.Done():
return
}
}
}

func (c *Expiring) gc() {
now := c.clock.Now()

c.mu.Lock()
defer c.mu.Unlock()
for {
// Return from gc if the heap is empty or the next element is not yet
// expired.
//
// heap[0] is a peek at the next element in the heap, which is not obvious
// from looking at the (*expiringHeap).Pop() implmentation below.
// heap.Pop() swaps the first entry with the last entry of the heap, then
// calls (*expiringHeap).Pop() which returns the last element.
if len(c.heap) == 0 || now.After(c.heap[0].expiry) {
return
}
cleanup := heap.Pop(&c.heap).(*expiringHeapEntry)
c.del(cleanup.key, cleanup.generation)
}
}

type expiringHeapEntry struct {
key interface{}
generation uint64
expiry time.Time
}

// expiringHeap is a min-heap ordered by expiration time of it's entries. The
// expiring cache uses this as a priority queue efficiently organize entries to
// be garbage collected once they expire.
type expiringHeap []*expiringHeapEntry
mikedanese marked this conversation as resolved.
Show resolved Hide resolved
mikedanese marked this conversation as resolved.
Show resolved Hide resolved

var _ heap.Interface = &expiringHeap{}

func (cq expiringHeap) Len() int {
return len(cq)
}

func (cq expiringHeap) Less(i, j int) bool {
return cq[i].expiry.Before(cq[j].expiry)
}

func (cq expiringHeap) Swap(i, j int) {
cq[i], cq[j] = cq[j], cq[i]
}

func (cq *expiringHeap) Push(c interface{}) {
*cq = append(*cq, c.(*expiringHeapEntry))
}

func (cq *expiringHeap) Pop() interface{} {
c := (*cq)[cq.Len()-1]
*cq = (*cq)[:cq.Len()-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This never shrinks the underlying array right? If so, that could be something to handle in gc method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that these are pointers (so 8 bytes), I'm not particularly concerned. We can store a million in 8MB and the scheme is simple and once it warms up, we don't need to do any allocations here. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have bigger problems if this cache gets big enough for this to be a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I had not considered the fact that the pointers do not take up much space. I think we can ignore the size of the underlying array.

Do we need to explicitly do a (*cq)[cq.Len()-1] = nil to make sure the value can be gc'd?

return c
}