diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index f61b7e1e87af7..6e54eb33f2295 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,19 +1,22 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil +go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/pdata v1.2.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect @@ -21,8 +24,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) -go 1.21 - -toolchain go1.21.1 +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 6f3b578693b23..9a5008b4d916b 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -16,6 +16,10 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -23,6 +27,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -74,5 +80,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go new file mode 100644 index 0000000000000..77b29f5febd20 --- /dev/null +++ b/internal/exp/metrics/staleness/map.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// Map is an abstraction over a map +type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value + Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map + Store(key identity.Stream, value T) + // Remove the value at key from the map + Delete(key identity.Stream) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool +} + +// RawMap implementation + +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +// RawMap is an implementation of the Map interface using a standard golang map +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } + } + return false + } +} diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go new file mode 100644 index 0000000000000..f1b01743f95fe --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "container/heap" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// PriorityQueue represents a way to store entries sorted by their priority. +// Pop() will return the oldest entry of the set. +type PriorityQueue interface { + // Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted + Update(id identity.Stream, newPrio time.Time) + // Peek will return the entry at the HEAD of the queue *without* removing it from the queue + Peek() (identity.Stream, time.Time) + // Pop will remove the entry at the HEAD of the queue and return it + Pop() (identity.Stream, time.Time) + // Len will return the number of entries in the queue + Len() int +} + +// heapQueue implements heap.Interface. +// We use it as the inner implementation of a heap-based sorted queue +type heapQueue []*queueItem + +type queueItem struct { + key identity.Stream + prio time.Time + index int +} + +func (pq heapQueue) Len() int { return len(pq) } + +func (pq heapQueue) Less(i, j int) bool { + // We want Pop to give us the lowest priority + return pq[i].prio.Before(pq[j].prio) +} + +func (pq heapQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *heapQueue) Push(x any) { + n := len(*pq) + item := x.(*queueItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *heapQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +type heapPriorityQueue struct { + inner heapQueue + itemLookup map[identity.Stream]*queueItem +} + +func NewPriorityQueue() PriorityQueue { + pq := &heapPriorityQueue{ + inner: heapQueue{}, + itemLookup: map[identity.Stream]*queueItem{}, + } + heap.Init(&pq.inner) + + return pq +} + +func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { + // Check if the entry already exists in the queue + item, ok := pq.itemLookup[id] + if ok { + // If so, we can update it in place + item.prio = newPrio + heap.Fix(&pq.inner, item.index) + } else { + item = &queueItem{ + key: id, + prio: newPrio, + } + heap.Push(&pq.inner, item) + pq.itemLookup[id] = item + } +} + +func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { + val := pq.inner[0] + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { + val := heap.Pop(&pq.inner).(*queueItem) + delete(pq.itemLookup, val.key) + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Len() int { + return pq.inner.Len() +} diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go new file mode 100644 index 0000000000000..b58478e7c1db0 --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestPriorityQueueImpl(t *testing.T) { + t.Parallel() + + pq := NewPriorityQueue() + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + + initialTime := time.Time{} + prioA := initialTime.Add(2 * time.Second) + prioB := initialTime.Add(1 * time.Second) + prioC := initialTime.Add(3 * time.Second) + + pq.Update(idA, prioA) + pq.Update(idB, prioB) + pq.Update(idC, prioC) + + // The first item should be B + id, prio := pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // If we peek again, nothing should change + id, prio = pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Now if we peek again, it should be the next item + id, prio = pq.Peek() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // One last time + id, prio = pq.Peek() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // The queue should now be empty + require.Equal(t, 0, pq.Len()) + + // And the inner lookup map should also be empty + require.IsType(t, &heapPriorityQueue{}, pq) + heapQueue := pq.(*heapPriorityQueue) + require.Len(t, heapQueue.itemLookup, 0) +} + +func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { + res := pcommon.NewResource() + err := res.Attributes().FromRaw(map[string]any{ + "foo": "bar", + "asdf": "qwer", + }) + require.NoError(t, err) + + scope := pcommon.NewInstrumentationScope() + scope.SetName("TestScope") + scope.SetVersion("v1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "aaa": "bbb", + "ccc": "ddd", + }) + require.NoError(t, err) + + metric := pmetric.NewMetric() + + sum := metric.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + dp := sum.DataPoints().AppendEmpty() + dp.SetStartTimestamp(678) + dp.SetTimestamp(789) + dp.SetDoubleValue(123.456) + err = dp.Attributes().FromRaw(attributes) + require.NoError(t, err) + + return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp) +} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go new file mode 100644 index 0000000000000..f5803ccdeb555 --- /dev/null +++ b/internal/exp/metrics/staleness/staleness.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// We override how Now() is returned, so we can have deterministic tests +var NowFunc = time.Now + +// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can +// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is +// older than the `max` +// +// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded +// environment, then it is the user's responsibility to properly serialize calls to Staleness methods +type Staleness[T any] struct { + max time.Duration + + items Map[T] + pq PriorityQueue +} + +func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { + return &Staleness[T]{ + max: max, + items: newMap, + pq: NewPriorityQueue(), + } +} + +// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value +func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { + return s.items.Load(key) +} + +// Store the given key value pair in the map, and update the pair's staleness value to "now" +func (s *Staleness[T]) Store(id identity.Stream, value T) { + s.pq.Update(id, NowFunc()) + s.items.Store(id, value) +} + +// Items returns an iterator function that in future go version can be used with range +// See: https://go.dev/wiki/RangefuncExperiment +func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return s.items.Items() +} + +// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` +// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would +// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. +func (s *Staleness[T]) ExpireOldEntries() { + now := NowFunc() + + for { + _, ts := s.pq.Peek() + if now.Sub(ts) < s.max { + break + } + id, _ := s.pq.Pop() + s.items.Delete(id) + } +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go new file mode 100644 index 0000000000000..4d96b41061e68 --- /dev/null +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestStaleness(t *testing.T) { + max := 1 * time.Second + stalenessMap := NewStaleness[int]( + max, + &RawMap[identity.Stream, int]{}, + ) + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + idD := generateStreamID(t, map[string]any{ + "ddd": "024", + }) + + initialTime := time.Time{} + timeA := initialTime.Add(2 * time.Second) + timeB := initialTime.Add(1 * time.Second) + timeC := initialTime.Add(3 * time.Second) + timeD := initialTime.Add(4 * time.Second) + + valueA := 1 + valueB := 4 + valueC := 7 + valueD := 0 + + // Add the values to the map + NowFunc = func() time.Time { return timeA } + stalenessMap.Store(idA, valueA) + NowFunc = func() time.Time { return timeB } + stalenessMap.Store(idB, valueB) + NowFunc = func() time.Time { return timeC } + stalenessMap.Store(idC, valueC) + NowFunc = func() time.Time { return timeD } + stalenessMap.Store(idD, valueD) + + // Set the time to 2.5s and run expire + // This should remove B, but the others should remain + // (now == 2.5s, B == 1s, max == 1s) + // now > B + max + NowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idA: valueA, + idC: valueC, + idD: valueD, + }, + stalenessMap, + ) + + // Set the time to 4.5s and run expire + // This should remove A and C, but D should remain + // (now == 2.5s, A == 2s, C == 3s, max == 1s) + // now > A + max AND now > C + max + NowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idD: valueD, + }, + stalenessMap, + ) +} + +func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { + actual := map[identity.Stream]int{} + + sm.Items()(func(key identity.Stream, value int) bool { + actual[key] = value + return true + }) + require.Equal(t, expected, actual) +}