From a93518f9e2d003a552804ce59271899fb138ebec Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Fri, 8 May 2015 09:16:24 +0200 Subject: [PATCH] Add cache with multiple shards to decrease lock contention --- pkg/tools/etcd_helper.go | 68 ++++++++++++++++++++++------------------ pkg/util/cache.go | 64 +++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 31 deletions(-) create mode 100644 pkg/util/cache.go diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index dda532262b67..03df8e440c8f 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -25,11 +25,11 @@ import ( "path" "reflect" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/prometheus/client_golang/prometheus" @@ -56,6 +56,18 @@ var ( "because two concurrent threads can miss the cache and generate the same entry twice.", }, ) + cacheGetLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "etcd_request_cache_get_latencies_summary", + Help: "Latency in microseconds of getting an object from etcd cache", + }, + ) + cacheAddLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "etcd_request_cache_add_latencies_summary", + Help: "Latency in microseconds of adding an object to etcd cache", + }, + ) etcdRequestLatenciesSummary = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "etcd_request_latencies_summary", @@ -65,21 +77,25 @@ var ( ) ) -func getTypeName(obj interface{}) string { - return reflect.TypeOf(obj).String() -} - -func recordEtcdRequestLatency(verb, resource string, startTime time.Time) { - etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) -} +const maxEtcdCacheEntries int = 50000 func init() { prometheus.MustRegister(cacheHitCounter) prometheus.MustRegister(cacheMissCounter) prometheus.MustRegister(cacheEntryCounter) + prometheus.MustRegister(cacheAddLatency) + prometheus.MustRegister(cacheGetLatency) prometheus.MustRegister(etcdRequestLatenciesSummary) } +func getTypeName(obj interface{}) string { + return reflect.TypeOf(obj).String() +} + +func recordEtcdRequestLatency(verb, resource string, startTime time.Time) { + etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) +} + // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { Client EtcdGetSet @@ -96,8 +112,7 @@ type EtcdHelper struct { // support multi-object transaction that will result in many objects with the same index. // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant. // TODO: Measure how much this cache helps after the conversion code is optimized. - cache map[uint64]runtime.Object - mutex *sync.RWMutex + cache util.Cache } // NewEtcdHelper creates a helper that works against objects that use the internal @@ -108,8 +123,7 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe Codec: codec, Versioner: APIObjectVersioner{}, PathPrefix: prefix, - cache: make(map[uint64]runtime.Object), - mutex: new(sync.RWMutex), + cache: util.NewCache(maxEtcdCacheEntries), } } @@ -207,16 +221,13 @@ type etcdCache interface { addToCache(index uint64, obj runtime.Object) } -const maxEtcdCacheEntries int = 50000 - func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { - var obj runtime.Object - func() { - h.mutex.RLock() - defer h.mutex.RUnlock() - obj = h.cache[index] + startTime := time.Now() + defer func() { + cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) }() - if obj != nil { + obj, found := h.cache.Get(index) + if found { // We should not return the object itself to avoid poluting the cache if someone // modifies returned values. objCopy, err := conversion.DeepCopy(obj) @@ -232,24 +243,19 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { } func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) { + startTime := time.Now() + defer func() { + cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) + }() objCopy, err := conversion.DeepCopy(obj) if err != nil { glog.Errorf("Error during DeepCopy of cached object: %q", err) return } - h.mutex.Lock() - defer h.mutex.Unlock() - if _, found := h.cache[index]; !found { + isOverwrite := h.cache.Add(index, objCopy) + if !isOverwrite { cacheEntryCounter.Inc() } - h.cache[index] = objCopy.(runtime.Object) - if len(h.cache) > maxEtcdCacheEntries { - var randomKey uint64 - for randomKey = range h.cache { - break - } - delete(h.cache, randomKey) - } } // ExtractToList works on a *List api object (an object that satisfies the runtime.IsList diff --git a/pkg/util/cache.go b/pkg/util/cache.go new file mode 100644 index 000000000000..158eb3ec2469 --- /dev/null +++ b/pkg/util/cache.go @@ -0,0 +1,64 @@ +package util + +import ( + "sync" +) + +const ( + shardsCount int = 32 +) + +type Cache []*cacheShard + +func NewCache(maxSize int) Cache { + cache := make(Cache, shardsCount) + for i := 0; i < shardsCount; i++ { + cache[i] = &cacheShard{ + items: make(map[uint64]interface{}), + maxSize: maxSize / shardsCount, + } + } + return cache +} + +func (c Cache) getShard(index uint64) *cacheShard { + return c[index%uint64(shardsCount)] +} + +// Returns true if object already existed, false otherwise. +func (c *Cache) Add(index uint64, obj interface{}) bool { + return c.getShard(index).add(index, obj) +} + +func (c *Cache) Get(index uint64) (obj interface{}, found bool) { + return c.getShard(index).get(index) +} + +type cacheShard struct { + items map[uint64]interface{} + sync.RWMutex + maxSize int +} + +// Returns true if object already existed, false otherwise. +func (s *cacheShard) add(index uint64, obj interface{}) bool { + s.Lock() + defer s.Unlock() + _, isOverwrite := s.items[index] + s.items[index] = obj + if len(s.items) > s.maxSize { + var randomKey uint64 + for randomKey = range s.items { + break + } + delete(s.items, randomKey) + } + return isOverwrite +} + +func (s *cacheShard) get(index uint64) (obj interface{}, found bool) { + s.RLock() + defer s.RUnlock() + obj, found = s.items[index] + return +}