Skip to content

Commit

Permalink
Move Staleness map to be internal
Browse files Browse the repository at this point in the history
So users are forced to use the correct methods.

Also adds lots of documentation
  • Loading branch information
RichieSams committed Feb 20, 2024
1 parent 63bdfb6 commit e12c0ff
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 78 deletions.
51 changes: 51 additions & 0 deletions internal/exp/metrics/staleness/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package staleness

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// Map is an abstraction over a map
type Map[T any] interface {
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
Load(key identity.Stream) (T, bool)
// Store the given key value pair in the map
Store(key identity.Stream, value T)
// Remove the value at key from the map
Delete(key identity.Stream)
// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
Items() func(yield func(identity.Stream, T) bool) bool
}

// RawMap implementation

var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil)

// RawMap is an implementation of the Map interface using a standard golang map
type RawMap[K comparable, V any] map[K]V

func (rm *RawMap[K, V]) Load(key K) (V, bool) {
value, ok := (*rm)[key]
return value, ok
}

func (rm *RawMap[K, V]) Store(key K, value V) {
(*rm)[key] = value
}

func (rm *RawMap[K, V]) Delete(key K) {
delete(*rm, key)
}

func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool {
return func(yield func(K, V) bool) bool {
for k, v := range *rm {
if !yield(k, v) {
break
}
}
return false
}
}
36 changes: 21 additions & 15 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// PriorityQueue represents a way to store entries sorted by their priority.
// Pop() will return the oldest entry of the set.
type PriorityQueue interface {
// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted
Update(id identity.Stream, newPrio time.Time)
// Peek will return the entry at the HEAD of the queue *without* removing it from the queue
Peek() (identity.Stream, time.Time)
// Pop will remove the entry at the HEAD of the queue and return it
Pop() (identity.Stream, time.Time)
// Len will return the number of entries in the queue
Len() int
}

// heapQueue implements heap.Interface.
// We use it as the inner implementation of a heap-based sorted queue
type heapQueue []*queueItem

type queueItem struct {
Expand Down Expand Up @@ -48,18 +63,6 @@ func (pq *heapQueue) Pop() any {
return item
}

func (pq *heapQueue) Update(item *queueItem, newPrio time.Time) {
item.prio = newPrio
heap.Fix(pq, item.index)
}

type PriorityQueue interface {
Update(id identity.Stream, newPrio time.Time)
Peek() (identity.Stream, time.Time)
Pop() (identity.Stream, time.Time)
Len() int
}

type heapPriorityQueue struct {
inner heapQueue
itemLookup map[identity.Stream]*queueItem
Expand All @@ -76,16 +79,19 @@ func NewPriorityQueue() PriorityQueue {
}

func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) {
// Check if the entry already exists in the queue
item, ok := pq.itemLookup[id]
if !ok {
if ok {
// If so, we can update it in place
item.prio = newPrio
heap.Fix(&pq.inner, item.index)
} else {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
} else {
pq.inner.Update(item, newPrio)
}
}

Expand Down
50 changes: 28 additions & 22 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,47 @@ import (
// We override how Now() is returned, so we can have deterministic tests
var nowFunc = time.Now

type Map[T any] interface {
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
Load(key identity.Stream) (T, bool)
// Store the given key value pair in the map
Store(key identity.Stream, value T)
// LoadOrStore will either load the value from the map and return it and the boolean `true`
// or if it doesn't exist in the Map yet, the value passed in will be stored and then returned with the boolean `false`
LoadOrStore(key identity.Stream, value T) (T, bool)
// Remove the value at key from the map
Delete(key identity.Stream)
// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
Items() func(yield func(identity.Stream, T) bool) bool
}

// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can
// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is
// older than the `max`
//
// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded
// environment, then it is the user's responsibility to properly serialize calls to Staleness methods
type Staleness[T any] struct {
max time.Duration

Map[T]
pq PriorityQueue
items Map[T]
pq PriorityQueue
}

func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] {
return &Staleness[T]{
max: max,
Map: newMap,
pq: NewPriorityQueue(),
max: max,
items: newMap,
pq: NewPriorityQueue(),
}
}

// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
func (s *Staleness[T]) Load(key identity.Stream) (T, bool) {
return s.items.Load(key)
}

// Store the given key value pair in the map, and update the pair's staleness value to "now"
func (s *Staleness[T]) Store(id identity.Stream, value T) {
s.pq.Update(id, nowFunc())
s.Map.Store(id, value)
s.items.Store(id, value)
}

// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool {
return s.items.Items()
}

// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max`
// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would
// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed.
func (s *Staleness[T]) ExpireOldEntries() {
now := nowFunc()

Expand All @@ -56,6 +62,6 @@ func (s *Staleness[T]) ExpireOldEntries() {
break
}
id, _ := s.pq.Pop()
s.Map.Delete(id)
s.items.Delete(id)
}
}
41 changes: 0 additions & 41 deletions internal/exp/metrics/staleness/staleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// RawMap
var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil)

type RawMap[K comparable, V any] map[K]V

func (rm *RawMap[K, V]) Load(key K) (V, bool) {
value, ok := (*rm)[key]
return value, ok
}

func (rm *RawMap[K, V]) LoadOrStore(key K, value V) (V, bool) {
returnedVal, ok := (*rm)[key]
if !ok {
(*rm)[key] = value
returnedVal = value
}

return returnedVal, ok
}

func (rm *RawMap[K, V]) Store(key K, value V) {
(*rm)[key] = value
}

func (rm *RawMap[K, V]) Delete(key K) {
delete(*rm, key)
}

func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool {
return func(yield func(K, V) bool) bool {
for k, v := range *rm {
if !yield(k, v) {
break
}
}
return false
}
}

// Tests

func TestStaleness(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit e12c0ff

Please sign in to comment.