Skip to content

Commit

Permalink
Merge pull request #16 from tarent/cache-metrics
Browse files Browse the repository at this point in the history
Cache metrics
  • Loading branch information
domano committed Jul 11, 2016
2 parents 456a5da + acbc2d3 commit b3000cb
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 17 deletions.
97 changes: 81 additions & 16 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/hashicorp/golang-lru/simplelru"
"github.com/tarent/lib-compose/logging"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -18,8 +17,11 @@ type Cache struct {
lock sync.RWMutex
lruBackend *simplelru.LRU
maxAge time.Duration
maxSizeBytes int32
currentSizeBytes int32
maxSizeBytes int
currentSizeBytes int
hits int
misses int
stats map[string]interface{}
}

type CacheEntry struct {
Expand All @@ -36,7 +38,7 @@ func NewCache(name string, maxEntries int, maxSizeMB int, maxAge time.Duration)
c := &Cache{
name: name,
maxAge: maxAge,
maxSizeBytes: int32(maxSizeMB) * 1024 * 1024,
maxSizeBytes: maxSizeMB * 1024 * 1024,
}

var err error
Expand All @@ -56,19 +58,37 @@ func (c *Cache) logEvery(d time.Duration) {
for {
select {
case <-time.After(d):
logging.Logger.WithFields(logrus.Fields{
"type": "metric",
"metric_name": "cachestatus",
"cache_entries": c.Len(),
"cache_size_bytes": c.SizeByte(),
}).Infof("cache status #%v, %vbytes", c.Len(), c.SizeByte())
c.PurgeOldEntries()
c.calculateStats(d)
}
}
}

func (c *Cache) onEvicted(key, value interface{}) {
entry := value.(*CacheEntry)
atomic.AddInt32(&c.currentSizeBytes, -1*int32(entry.size))
func (c *Cache) calculateStats(reportingDuration time.Duration) {
c.lock.Lock()
defer c.lock.Unlock()

ratio := 100
if c.hits+c.misses != 0 {
ratio = 100 * c.hits / (c.hits + c.misses)
}

c.stats = map[string]interface{}{
"type": "metric",
"metric_name": "cachestatus",
"cache_entries": c.lruBackend.Len(),
"cache_size_bytes": c.currentSizeBytes,
"cache_reporting_duration": reportingDuration,
"cache_hits": c.hits,
"cache_misses": c.misses,
"cache_hit_ratio": ratio,
}

c.hits = 0
c.misses = 0
logging.Logger.
WithFields(logrus.Fields(c.stats)).
Infof("cache status #%v, %vbytes, %v%% hits", c.lruBackend.Len(), c.currentSizeBytes, ratio)
}

func (c *Cache) Get(key string) (interface{}, bool) {
Expand All @@ -79,9 +99,11 @@ func (c *Cache) Get(key string) (interface{}, bool) {
entry := e.(*CacheEntry)
if time.Since(entry.fetchTime) < c.maxAge {
entry.hits++
c.hits++
return entry.cacheObject, true
}
}
c.misses++
return nil, false
}

Expand All @@ -95,14 +117,55 @@ func (c *Cache) Set(key string, label string, sizeBytes int, cacheObject interfa
}
c.lock.Lock()
defer c.lock.Unlock()
atomic.AddInt32(&c.currentSizeBytes, int32(sizeBytes))

// first remove, to have correct size counting
c.lruBackend.Remove(key)

c.currentSizeBytes += sizeBytes
c.lruBackend.Add(key, entry)

for atomic.LoadInt32(&c.currentSizeBytes) > int32(c.maxSizeBytes) {
for c.currentSizeBytes > c.maxSizeBytes {
c.lruBackend.RemoveOldest()
}
}

// called by the cache api, if items are removed,
// because of an overfilled cache.
// Attention: This method does not locking, because it
// will be triggered as a subcall of Set()
func (c *Cache) onEvicted(key, value interface{}) {
entry := value.(*CacheEntry)
c.currentSizeBytes -= entry.size
}

// PurgeOldEntries removes all entries which are out of their ttl
func (c *Cache) PurgeOldEntries() {
c.lock.RLock()
keys := c.lruBackend.Keys()
c.lock.RUnlock()

purged := 0
for _, key := range keys {
c.lock.RLock()
e, found := c.lruBackend.Peek(key)
c.lock.RUnlock()

if found {
entry := e.(*CacheEntry)
if time.Since(entry.fetchTime) > c.maxAge {
c.lock.Lock()
c.lruBackend.Remove(key)
c.lock.Unlock()
purged++
}
}
}
logging.Logger.
WithFields(logrus.Fields(c.stats)).
Infof("purged %v out of %v cache entries", purged, len(keys))

}

func (c *Cache) Invalidate() {
c.lock.Lock()
defer c.lock.Unlock()
Expand All @@ -113,7 +176,9 @@ func (c *Cache) Invalidate() {

// SizeByte returns the total memory consumption of the cache
func (c *Cache) SizeByte() int {
return int(atomic.LoadInt32(&c.currentSizeBytes))
c.lock.RLock()
defer c.lock.RUnlock()
return c.currentSizeBytes
}

// Len returns the total number of entries in the cache
Expand Down
37 changes: 36 additions & 1 deletion cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func Test_Cache_MaxBytes(t *testing.T) {

// given a cache with max 1 mega byte, filled with 8 bytes
c := NewCache("my-cache", 100, 1, time.Hour)
c.Set("a", "", 400*1024, "a")
c.Set("a", "", 42*1024, "a")
c.Set("a", "", 400*1024, "a") // the same item only shoud count once, with the lastest bytes
c.Set("b", "", 400*1024, "b")
a.Equal(800*1024, c.SizeByte())
a.Equal(2, c.Len())
Expand All @@ -98,6 +99,40 @@ func Test_Cache_MaxBytes(t *testing.T) {
a.True(found)
}

func Test_Cache_Stats(t *testing.T) {
a := assert.New(t)

c := NewCache("my-cache", 3, 100, time.Hour)
c.Set("a", "", 42, "a")
c.Get("a")
c.Get("a")
c.Get("b")

c.calculateStats(time.Hour)
a.Equal(1, c.stats["cache_entries"])
a.Equal(42, c.stats["cache_size_bytes"])
a.Equal(2, c.stats["cache_hits"])
a.Equal(1, c.stats["cache_misses"])
a.Equal(66, c.stats["cache_hit_ratio"])
}

func Test_Cache_PurgeOldEntries(t *testing.T) {
a := assert.New(t)

c := NewCache("my-cache", 100, 100, time.Millisecond)
c.Set("a", "", 1, "a")
c.Set("b", "", 1, "a")
c.Set("c", "", 1, "a")
time.Sleep(time.Millisecond)
c.Set("d", "", 42, "a")
c.Set("e", "", 42, "a")

c.PurgeOldEntries()

a.Equal(2, c.Len())
a.Equal(84, c.SizeByte())
}

func Test_Cache_Invalidation(t *testing.T) {
a := assert.New(t)

Expand Down

0 comments on commit b3000cb

Please sign in to comment.