Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal/exp/metrics] Add a new internal package for handling metric staleness #31089

Merged
merged 9 commits into from
Feb 27, 2024
10 changes: 6 additions & 4 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/pdata v1.2.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.21

toolchain go1.21.1
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
9 changes: 9 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions internal/exp/metrics/staleness/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// Map is an abstraction over a map
type Map[T any] interface {
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
Load(key identity.Stream) (T, bool)
// Store the given key value pair in the map
Store(key identity.Stream, value T)
// Remove the value at key from the map
Delete(key identity.Stream)
// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
Items() func(yield func(identity.Stream, T) bool) bool
}

// RawMap implementation

var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil)

// RawMap is an implementation of the Map interface using a standard golang map
type RawMap[K comparable, V any] map[K]V
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this type doesn't creeps a bit beyond the scope of this package.
I like that this package defines the Map[T] interface as the behavior it expects to fulfill its purpose.
A general stream map would feel more fitting as a streams.HashMap or similar.

I'd be okay with this pkg not defining a map implementation at all, leaving that to consumers.
No hard opinion however, if you prefer we can leave it in here for now, as we'll likely iterate on these packages for a bit anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay with this pkg not defining a map implementation at all, leaving that to consumers.

That was my initial plan. I had implemented RawMap just in the tests. But then I realized that I would need to re-implement the same code in the interval processor (and perhaps you as well in deltatocumulative). So I moved it to be in the package proper as a convenience "reference" implementation.

But I'm kind of the same. I don't feel strongly either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you ok with leaving it in for now? And iterating as the various processors utilize it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump


func (rm *RawMap[K, V]) Load(key K) (V, bool) {
value, ok := (*rm)[key]
return value, ok
}

func (rm *RawMap[K, V]) Store(key K, value V) {
(*rm)[key] = value
}

func (rm *RawMap[K, V]) Delete(key K) {
delete(*rm, key)
}

func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool {
return func(yield func(K, V) bool) bool {
for k, v := range *rm {
if !yield(k, v) {
break
}
}
return false
}
}
111 changes: 111 additions & 0 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"container/heap"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// PriorityQueue represents a way to store entries sorted by their priority.
// Pop() will return the oldest entry of the set.
type PriorityQueue interface {
// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted
Update(id identity.Stream, newPrio time.Time)
// Peek will return the entry at the HEAD of the queue *without* removing it from the queue
Peek() (identity.Stream, time.Time)
// Pop will remove the entry at the HEAD of the queue and return it
Pop() (identity.Stream, time.Time)
// Len will return the number of entries in the queue
Len() int
}

// heapQueue implements heap.Interface.
// We use it as the inner implementation of a heap-based sorted queue
type heapQueue []*queueItem

type queueItem struct {
key identity.Stream
prio time.Time
index int
}

func (pq heapQueue) Len() int { return len(pq) }

func (pq heapQueue) Less(i, j int) bool {
// We want Pop to give us the lowest priority
return pq[i].prio.Before(pq[j].prio)
}

func (pq heapQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *heapQueue) Push(x any) {
n := len(*pq)
item := x.(*queueItem)
item.index = n
*pq = append(*pq, item)
}

func (pq *heapQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

type heapPriorityQueue struct {
inner heapQueue
itemLookup map[identity.Stream]*queueItem
}

func NewPriorityQueue() PriorityQueue {
pq := &heapPriorityQueue{
inner: heapQueue{},
itemLookup: map[identity.Stream]*queueItem{},
}
heap.Init(&pq.inner)

return pq
}

func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) {
// Check if the entry already exists in the queue
item, ok := pq.itemLookup[id]
if ok {
// If so, we can update it in place
item.prio = newPrio
heap.Fix(&pq.inner, item.index)
} else {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
}
}

func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) {
val := pq.inner[0]
return val.key, val.prio
}

func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) {
val := heap.Pop(&pq.inner).(*queueItem)
RichieSams marked this conversation as resolved.
Show resolved Hide resolved
delete(pq.itemLookup, val.key)
return val.key, val.prio
}

func (pq *heapPriorityQueue) Len() int {
return pq.inner.Len()
}
116 changes: 116 additions & 0 deletions internal/exp/metrics/staleness/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

func TestPriorityQueueImpl(t *testing.T) {
t.Parallel()

pq := NewPriorityQueue()

idA := generateStreamID(t, map[string]any{
"aaa": "123",
})
idB := generateStreamID(t, map[string]any{
"bbb": "456",
})
idC := generateStreamID(t, map[string]any{
"ccc": "789",
})

initialTime := time.Time{}
prioA := initialTime.Add(2 * time.Second)
prioB := initialTime.Add(1 * time.Second)
prioC := initialTime.Add(3 * time.Second)

pq.Update(idA, prioA)
pq.Update(idB, prioB)
pq.Update(idC, prioC)

// The first item should be B
id, prio := pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// If we peek again, nothing should change
id, prio = pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Now if we peek again, it should be the next item
id, prio = pq.Peek()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// One last time
id, prio = pq.Peek()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// The queue should now be empty
require.Equal(t, 0, pq.Len())

// And the inner lookup map should also be empty
require.IsType(t, &heapPriorityQueue{}, pq)
heapQueue := pq.(*heapPriorityQueue)
require.Len(t, heapQueue.itemLookup, 0)
}

func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream {
res := pcommon.NewResource()
err := res.Attributes().FromRaw(map[string]any{
"foo": "bar",
"asdf": "qwer",
})
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName("TestScope")
scope.SetVersion("v1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"aaa": "bbb",
"ccc": "ddd",
})
require.NoError(t, err)

metric := pmetric.NewMetric()

sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetStartTimestamp(678)
dp.SetTimestamp(789)
dp.SetDoubleValue(123.456)
err = dp.Attributes().FromRaw(attributes)
require.NoError(t, err)

return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp)
}