Skip to content

Commit

Permalink
Add cache with multiple shards to decrease lock contention
Browse files Browse the repository at this point in the history
  • Loading branch information
fgrzadkowski committed May 11, 2015
1 parent 12de230 commit a93518f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 31 deletions.
68 changes: 37 additions & 31 deletions pkg/tools/etcd_helper.go
Expand Up @@ -25,11 +25,11 @@ import (
"path"
"reflect"
"strings"
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -56,6 +56,18 @@ var (
"because two concurrent threads can miss the cache and generate the same entry twice.",
},
)
cacheGetLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_get_latencies_summary",
Help: "Latency in microseconds of getting an object from etcd cache",
},
)
cacheAddLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_add_latencies_summary",
Help: "Latency in microseconds of adding an object to etcd cache",
},
)
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "etcd_request_latencies_summary",
Expand All @@ -65,21 +77,25 @@ var (
)
)

func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}

func recordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}
const maxEtcdCacheEntries int = 50000

func init() {
prometheus.MustRegister(cacheHitCounter)
prometheus.MustRegister(cacheMissCounter)
prometheus.MustRegister(cacheEntryCounter)
prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary)
}

func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}

func recordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}

// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
Expand All @@ -96,8 +112,7 @@ type EtcdHelper struct {
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
cache map[uint64]runtime.Object
mutex *sync.RWMutex
cache util.Cache
}

// NewEtcdHelper creates a helper that works against objects that use the internal
Expand All @@ -108,8 +123,7 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe
Codec: codec,
Versioner: APIObjectVersioner{},
PathPrefix: prefix,
cache: make(map[uint64]runtime.Object),
mutex: new(sync.RWMutex),
cache: util.NewCache(maxEtcdCacheEntries),
}
}

Expand Down Expand Up @@ -207,16 +221,13 @@ type etcdCache interface {
addToCache(index uint64, obj runtime.Object)
}

const maxEtcdCacheEntries int = 50000

func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
var obj runtime.Object
func() {
h.mutex.RLock()
defer h.mutex.RUnlock()
obj = h.cache[index]
startTime := time.Now()
defer func() {
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}()
if obj != nil {
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := conversion.DeepCopy(obj)
Expand All @@ -232,24 +243,19 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
}

func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}()
objCopy, err := conversion.DeepCopy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
}
h.mutex.Lock()
defer h.mutex.Unlock()
if _, found := h.cache[index]; !found {
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
cacheEntryCounter.Inc()
}
h.cache[index] = objCopy.(runtime.Object)
if len(h.cache) > maxEtcdCacheEntries {
var randomKey uint64
for randomKey = range h.cache {
break
}
delete(h.cache, randomKey)
}
}

// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
Expand Down
64 changes: 64 additions & 0 deletions pkg/util/cache.go
@@ -0,0 +1,64 @@
package util

import (
"sync"
)

const (
shardsCount int = 32
)

type Cache []*cacheShard

func NewCache(maxSize int) Cache {
cache := make(Cache, shardsCount)
for i := 0; i < shardsCount; i++ {
cache[i] = &cacheShard{
items: make(map[uint64]interface{}),
maxSize: maxSize / shardsCount,
}
}
return cache
}

func (c Cache) getShard(index uint64) *cacheShard {
return c[index%uint64(shardsCount)]
}

// Returns true if object already existed, false otherwise.
func (c *Cache) Add(index uint64, obj interface{}) bool {
return c.getShard(index).add(index, obj)
}

func (c *Cache) Get(index uint64) (obj interface{}, found bool) {
return c.getShard(index).get(index)
}

type cacheShard struct {
items map[uint64]interface{}
sync.RWMutex
maxSize int
}

// Returns true if object already existed, false otherwise.
func (s *cacheShard) add(index uint64, obj interface{}) bool {
s.Lock()
defer s.Unlock()
_, isOverwrite := s.items[index]
s.items[index] = obj
if len(s.items) > s.maxSize {
var randomKey uint64
for randomKey = range s.items {
break
}
delete(s.items, randomKey)
}
return isOverwrite
}

func (s *cacheShard) get(index uint64) (obj interface{}, found bool) {
s.RLock()
defer s.RUnlock()
obj, found = s.items[index]
return
}

0 comments on commit a93518f

Please sign in to comment.