Skip to content

Commit

Permalink
[internal/exp/metrics] Add a new internal package for handling metric…
Browse files Browse the repository at this point in the history
… staleness

It's a glorified wrapper over a Map type, which allows values to be expired based on a pre-supplied interval.
  • Loading branch information
RichieSams committed Feb 6, 2024
1 parent 4eaf245 commit ce07908
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 7 deletions.
10 changes: 7 additions & 3 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
go 1.20

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v1.93.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/pdata v1.0.2-0.20240125183026-3cacd40b27e8
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.20
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
7 changes: 7 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/exp/metrics/identity/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"hash"
"hash/fnv"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

type Resource struct {
Expand Down
3 changes: 2 additions & 1 deletion internal/exp/metrics/identity/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package identity
import (
"hash"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

type Scope struct {
Expand Down
3 changes: 2 additions & 1 deletion internal/exp/metrics/identity/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package identity
import (
"hash"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

type Stream struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/exp/metrics/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
status:
codeowners: [sh0rez]
codeowners: [sh0rez, RichieSams]
97 changes: 97 additions & 0 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"container/heap"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type priorityQueueImpl []*queueItem

type queueItem struct {
key identity.Stream
prio time.Time
index int
}

func (pq priorityQueueImpl) Len() int { return len(pq) }

func (pq priorityQueueImpl) Less(i, j int) bool {
// We want Pop to give us the lowest priority
return pq[i].prio.Before(pq[j].prio)
}

func (pq priorityQueueImpl) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *priorityQueueImpl) Push(x any) {
n := len(*pq)
item := x.(*queueItem)
item.index = n
*pq = append(*pq, item)
}

func (pq *priorityQueueImpl) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) {
item.prio = newPrio
heap.Fix(pq, item.index)
}

type PriorityQueue struct {
inner priorityQueueImpl
itemLookup map[identity.Stream]*queueItem
}

func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
inner: priorityQueueImpl{},
itemLookup: map[identity.Stream]*queueItem{},
}
heap.Init(&pq.inner)

return pq
}

func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) {
item, ok := pq.itemLookup[id]
if !ok {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
} else {
pq.inner.Update(item, newPrio)
}
}

func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) {
val := pq.inner[0]
return val.key, val.prio
}

func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) {
val := heap.Pop(&pq.inner).(*queueItem)
return val.key, val.prio
}

func (pq *PriorityQueue) Len() int {
return pq.inner.Len()
}
111 changes: 111 additions & 0 deletions internal/exp/metrics/staleness/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

func TestPriorityQueueImpl(t *testing.T) {
t.Parallel()

pq := NewPriorityQueue()

idA := generateStreamID(t, map[string]any{
"aaa": "123",
})
idB := generateStreamID(t, map[string]any{
"bbb": "456",
})
idC := generateStreamID(t, map[string]any{
"ccc": "789",
})

initialTime := time.Time{}
prioA := initialTime.Add(2 * time.Second)
prioB := initialTime.Add(1 * time.Second)
prioC := initialTime.Add(3 * time.Second)

pq.Update(idA, prioA)
pq.Update(idB, prioB)
pq.Update(idC, prioC)

// The first item should be B
id, prio := pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// If we peek again, nothing should change
id, prio = pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Now if we peek again, it should be the next item
id, prio = pq.Peek()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// One last time
id, prio = pq.Peek()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// The queue should now be empty
require.Equal(t, 0, pq.Len())
}

func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream {
res := pcommon.NewResource()
err := res.Attributes().FromRaw(map[string]any{
"foo": "bar",
"asdf": "qwer",
})
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName("TestScope")
scope.SetVersion("v1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"aaa": "bbb",
"ccc": "ddd",
})
require.NoError(t, err)

metric := pmetric.NewMetric()

sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetStartTimestamp(678)
dp.SetTimestamp(789)
dp.SetDoubleValue(123.456)
err = dp.Attributes().FromRaw(attributes)
require.NoError(t, err)

return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp)
}
55 changes: 55 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// We override how Now() is returned, so we can have deterministic tests
var nowFunc = time.Now

type Map[T any] interface {
Load(key identity.Stream) (T, bool)
Store(key identity.Stream, value T)
Delete(key identity.Stream)
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
Range(f func(key identity.Stream, value T) bool)
}

type Staleness[T any] struct {
max time.Duration

Map[T]
pq *PriorityQueue
}

func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] {
return &Staleness[T]{
max: max,
Map: newMap,
pq: NewPriorityQueue(),
}
}

func (s *Staleness[T]) Store(id identity.Stream, value T) {
s.pq.Update(id, nowFunc())
s.Map.Store(id, value)
}

func (s *Staleness[T]) ExpireOldEntries() {
now := nowFunc()

for {
_, ts := s.pq.Peek()
if now.Sub(ts) < s.max {
break
}
id, _ := s.pq.Pop()
s.Map.Delete(id)
}
}
Loading

0 comments on commit ce07908

Please sign in to comment.