Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ var (
},
[]string{"streamID", "reason"},
)
promCacheHitEntryAgeMs = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_hit_entry_age_ms",
Help: "Wall time since the cache entry was written when a plugin read hits the cache (staleness proxy)",
Buckets: []float64{
0.5, 1, 2, 5, 10, 25, 50, 100, 250, 500, 1000,
},
},
[]string{"streamID"},
)
)

// StreamValueCache is used by dataSource to decouple the read/write paths for stream values.
Expand Down Expand Up @@ -60,6 +71,7 @@ type Cache struct {
type item struct {
value llo.StreamValue
expiresAt time.Time
writtenAt time.Time // wall clock at Add/AddMany; used for cache_hit_entry_age_ms
}

type cacheOutcome string
Expand All @@ -73,6 +85,7 @@ const (
type metricEvent struct {
id llotypes.StreamID
cacheOutcome cacheOutcome
ageMs float64 // valid when cacheOutcomeHit and writtenAt was set on the item
}

// NewCache creates a new cache.
Expand Down Expand Up @@ -112,24 +125,26 @@ func NewCache(cleanupInterval time.Duration) *Cache {

// Add adds a stream value to the cache.
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, ttl time.Duration) {
now := time.Now()
var expiresAt time.Time
if ttl > 0 {
expiresAt = time.Now().Add(ttl)
expiresAt = now.Add(ttl)
}
c.mu.Lock()
defer c.mu.Unlock()
c.values[id] = item{value: value, expiresAt: expiresAt}
c.values[id] = item{value: value, expiresAt: expiresAt, writtenAt: now}
}

func (c *Cache) AddMany(values map[llotypes.StreamID]llo.StreamValue, ttl time.Duration) {
now := time.Now()
var expiresAt time.Time
if ttl > 0 {
expiresAt = time.Now().Add(ttl)
expiresAt = now.Add(ttl)
}
c.mu.Lock()
defer c.mu.Unlock()
for id, value := range values {
c.values[id] = item{value: value, expiresAt: expiresAt}
c.values[id] = item{value: value, expiresAt: expiresAt, writtenAt: now}
}
}

Expand All @@ -152,7 +167,11 @@ func (c *Cache) UpdateStreamValues(streamValues llo.StreamValues) {
streamValues[id] = nil
continue
}
events = append(events, metricEvent{id: id, cacheOutcome: cacheOutcomeHit})
ageMs := -1.0
if !itm.writtenAt.IsZero() {
ageMs = float64(now.Sub(itm.writtenAt).Milliseconds())
}
events = append(events, metricEvent{id: id, cacheOutcome: cacheOutcomeHit, ageMs: ageMs})
streamValues[id] = itm.value
}
c.mu.RUnlock()
Expand Down Expand Up @@ -193,6 +212,9 @@ func (c *Cache) updateMetrics() {
idStr := strconv.FormatUint(uint64(e.id), 10)
if e.cacheOutcome == cacheOutcomeHit {
promCacheHitCount.WithLabelValues(idStr).Inc()
if e.ageMs >= 0 {
promCacheHitEntryAgeMs.WithLabelValues(idStr).Observe(e.ageMs)
}
} else {
promCacheMissCount.WithLabelValues(idStr, string(e.cacheOutcome)).Inc()
}
Expand Down
26 changes: 26 additions & 0 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -188,6 +190,30 @@ func TestCache_UpdateStreamValues(t *testing.T) {
})
}

func TestCache_UpdateStreamValues_RecordsHitEntryAge(t *testing.T) {
promCacheHitEntryAgeMs.Reset()
promCacheHitCount.Reset()

cache := NewCache(0)
defer cache.Close()
cache.AddMany(map[llotypes.StreamID]llo.StreamValue{
1: &mockStreamValue{value: []byte{1}},
}, time.Hour)

streamValues := llo.StreamValues{1: nil}
cache.UpdateStreamValues(streamValues)

var m io_prometheus_client.Metric
require.Eventually(t, func() bool {
hist := promCacheHitEntryAgeMs.WithLabelValues("1").(prometheus.Metric)
if err := hist.Write(&m); err != nil {
return false
}
return m.GetHistogram().GetSampleCount() >= 1
}, time.Second, 5*time.Millisecond)
assert.GreaterOrEqual(t, m.GetHistogram().GetSampleSum(), 0.0)
}

func TestCache_Add_Get(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading
Loading