Skip to content

Commit

Permalink
[internal/exp/metrics] Add a new internal package for handling metric…
Browse files Browse the repository at this point in the history
… staleness (#31089)

**Description:**
It's a glorified wrapper over a Map type, which allows values to be
expired based on a pre-supplied interval.

**Link to tracking Issue:**

#31016

**Testing:**
I added some basic tests of the PriorityQueue implementation as well as
the expiry behaviour of Staleness

**Documentation:**

All the new structs are documented
  • Loading branch information
RichieSams committed Feb 27, 2024
1 parent 0ed8bbb commit 184d094
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 4 deletions.
10 changes: 6 additions & 4 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/pdata v1.2.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.21

toolchain go1.21.1
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
9 changes: 9 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions internal/exp/metrics/staleness/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// Map is an abstraction over a map
type Map[T any] interface {
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
Load(key identity.Stream) (T, bool)
// Store the given key value pair in the map
Store(key identity.Stream, value T)
// Remove the value at key from the map
Delete(key identity.Stream)
// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
Items() func(yield func(identity.Stream, T) bool) bool
}

// RawMap implementation

var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil)

// RawMap is an implementation of the Map interface using a standard golang map
type RawMap[K comparable, V any] map[K]V

func (rm *RawMap[K, V]) Load(key K) (V, bool) {
value, ok := (*rm)[key]
return value, ok
}

func (rm *RawMap[K, V]) Store(key K, value V) {
(*rm)[key] = value
}

func (rm *RawMap[K, V]) Delete(key K) {
delete(*rm, key)
}

func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool {
return func(yield func(K, V) bool) bool {
for k, v := range *rm {
if !yield(k, v) {
break
}
}
return false
}
}
111 changes: 111 additions & 0 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"container/heap"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// PriorityQueue represents a way to store entries sorted by their priority.
// Pop() will return the oldest entry of the set.
type PriorityQueue interface {
// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted
Update(id identity.Stream, newPrio time.Time)
// Peek will return the entry at the HEAD of the queue *without* removing it from the queue
Peek() (identity.Stream, time.Time)
// Pop will remove the entry at the HEAD of the queue and return it
Pop() (identity.Stream, time.Time)
// Len will return the number of entries in the queue
Len() int
}

// heapQueue implements heap.Interface.
// We use it as the inner implementation of a heap-based sorted queue
type heapQueue []*queueItem

type queueItem struct {
key identity.Stream
prio time.Time
index int
}

func (pq heapQueue) Len() int { return len(pq) }

func (pq heapQueue) Less(i, j int) bool {
// We want Pop to give us the lowest priority
return pq[i].prio.Before(pq[j].prio)
}

func (pq heapQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *heapQueue) Push(x any) {
n := len(*pq)
item := x.(*queueItem)
item.index = n
*pq = append(*pq, item)
}

func (pq *heapQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

type heapPriorityQueue struct {
inner heapQueue
itemLookup map[identity.Stream]*queueItem
}

func NewPriorityQueue() PriorityQueue {
pq := &heapPriorityQueue{
inner: heapQueue{},
itemLookup: map[identity.Stream]*queueItem{},
}
heap.Init(&pq.inner)

return pq
}

func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) {
// Check if the entry already exists in the queue
item, ok := pq.itemLookup[id]
if ok {
// If so, we can update it in place
item.prio = newPrio
heap.Fix(&pq.inner, item.index)
} else {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
}
}

func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) {
val := pq.inner[0]
return val.key, val.prio
}

func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) {
val := heap.Pop(&pq.inner).(*queueItem)
delete(pq.itemLookup, val.key)
return val.key, val.prio
}

func (pq *heapPriorityQueue) Len() int {
return pq.inner.Len()
}
116 changes: 116 additions & 0 deletions internal/exp/metrics/staleness/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

func TestPriorityQueueImpl(t *testing.T) {
t.Parallel()

pq := NewPriorityQueue()

idA := generateStreamID(t, map[string]any{
"aaa": "123",
})
idB := generateStreamID(t, map[string]any{
"bbb": "456",
})
idC := generateStreamID(t, map[string]any{
"ccc": "789",
})

initialTime := time.Time{}
prioA := initialTime.Add(2 * time.Second)
prioB := initialTime.Add(1 * time.Second)
prioC := initialTime.Add(3 * time.Second)

pq.Update(idA, prioA)
pq.Update(idB, prioB)
pq.Update(idC, prioC)

// The first item should be B
id, prio := pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// If we peek again, nothing should change
id, prio = pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Now if we peek again, it should be the next item
id, prio = pq.Peek()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// One last time
id, prio = pq.Peek()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// The queue should now be empty
require.Equal(t, 0, pq.Len())

// And the inner lookup map should also be empty
require.IsType(t, &heapPriorityQueue{}, pq)
heapQueue := pq.(*heapPriorityQueue)
require.Len(t, heapQueue.itemLookup, 0)
}

func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream {
res := pcommon.NewResource()
err := res.Attributes().FromRaw(map[string]any{
"foo": "bar",
"asdf": "qwer",
})
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName("TestScope")
scope.SetVersion("v1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"aaa": "bbb",
"ccc": "ddd",
})
require.NoError(t, err)

metric := pmetric.NewMetric()

sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetStartTimestamp(678)
dp.SetTimestamp(789)
dp.SetDoubleValue(123.456)
err = dp.Attributes().FromRaw(attributes)
require.NoError(t, err)

return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp)
}

0 comments on commit 184d094

Please sign in to comment.