Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval] Implement the main logic #32054

Merged
merged 32 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
76d0a81
[processor/interval] Implement the main logic
RichieSams Apr 1, 2024
154e7cb
Fix linter
RichieSams Apr 1, 2024
5b5ebfb
Clear all the state after exporting
RichieSams Apr 1, 2024
d1ee4b1
Fix linting
RichieSams Apr 1, 2024
30f03b4
Remove all the staleness handling
RichieSams Apr 17, 2024
018e00b
Add tests for summaries and histograms
RichieSams Apr 17, 2024
99f0dfb
Add tests for Exponential Histograms
RichieSams Apr 17, 2024
b8e0035
Add tests for all Delta metrics
RichieSams Apr 17, 2024
52a7cfa
Update README
RichieSams Apr 17, 2024
92ef4d8
Fix version mismatch
RichieSams Apr 18, 2024
5c1a45b
Declare tests using yaml files and golden
RichieSams Apr 18, 2024
303548a
Implement Clear using builtin clear()
RichieSams Apr 22, 2024
92737eb
Remove reference to state stelness
RichieSams Apr 22, 2024
f6ad46c
Change default export period to 60s
RichieSams Apr 22, 2024
f1b9b18
Fix README admonition syntax
RichieSams Apr 22, 2024
9113d2f
Add a changelog entry
RichieSams Apr 22, 2024
72fdba1
Put exportMetricsLoop inline
RichieSams Apr 22, 2024
e40655b
Simplify the processor logic
RichieSams Apr 23, 2024
d726e44
Revert
RichieSams Apr 23, 2024
aee70ef
Fix dependencies
RichieSams Apr 23, 2024
238cec3
Fix linting
RichieSams Apr 23, 2024
540bca7
One more fix
RichieSams Apr 23, 2024
b5124d3
Run gci
RichieSams Apr 23, 2024
8d3cae8
Fix crosslink
RichieSams Apr 23, 2024
b7e4d35
make tidy
RichieSams Apr 23, 2024
983bab3
Make generate
RichieSams Apr 23, 2024
03a3f79
Fix generate
RichieSams Apr 24, 2024
e2c2d1d
Merge branch 'main' into processor/interval
RichieSams Apr 24, 2024
375c3c5
Use a local variable for the export ticker
RichieSams Apr 24, 2024
b5bf916
Small comment clarification
RichieSams Apr 24, 2024
08282c8
Merge branch 'main' into processor/interval
RichieSams Apr 24, 2024
6e3601a
Merge branch 'main' into processor/interval
RichieSams Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/interval-implement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "new_component"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: intervalprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implements the new interval processor. See the README for more info about how to use it

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29461]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs => ../../internal/aws/cwlogs
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver => ../../receiver/k8sobjectsreceiver
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.99.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.99.0 // indirect
Expand Down Expand Up @@ -751,6 +752,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/c

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => ../../receiver/awsxrayreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver => ../../receiver/azureblobreceiver
Expand Down
4 changes: 4 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ func (s *Staleness[T]) Evict() identity.Stream {
s.items.Delete(id)
return id
}

func (s *Staleness[T]) Clear() {
s.items.Clear()
}
5 changes: 5 additions & 0 deletions internal/exp/metrics/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Map[T any] interface {
Delete(identity.Stream)
Items() func(yield func(identity.Stream, T) bool) bool
Len() int
Clear()
}

var _ Map[any] = HashMap[any](nil)
Expand Down Expand Up @@ -51,6 +52,10 @@ func (m HashMap[T]) Len() int {
return len((map[identity.Stream]T)(m))
}

func (m HashMap[T]) Clear() {
clear(m)
}

// Evictors remove the "least important" stream based on some strategy such as
// the oldest, least active, etc.
type Evictor interface {
Expand Down
35 changes: 33 additions & 2 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
| Distributions | [] |
| Warnings | [Statefulness](#warnings) |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Finterval%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Finterval) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Finterval%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Finterval) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams), [@sh0rez](https://www.github.com/sh0rez) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->
Expand All @@ -31,4 +31,35 @@ The following metric types will *not* be aggregated, and will instead be passed,

The following settings can be optionally configured:

- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s

## Example of metric flows

The following sum metrics come into the processor to be handled

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 0 | test_metric | Cumulative | labelA: foo | 4.0 |
| 2 | test_metric | Cumulative | labelA: bar | 3.1 |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |
| 6 | test_metric | Cumulative | labelA: foo | 8.2 |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

The processor would immediately pass the following metrics to the next processor in the chain

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |

Because it's a Delta metric.

At the next `interval` (15s by default), the processor would pass the following metrics to the next processor in the chain

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ----------- | ------------------------- | ----------- | ----: |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

> [!IMPORTANT]
> After exporting, any internal state is cleared. So if no new metrics come in, the next interval will export nothing.
13 changes: 11 additions & 2 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"

import (
"errors"
"time"

"go.opentelemetry.io/collector/component"
)

var (
ErrInvalidIntervalValue = errors.New("invalid interval value")
)

var _ component.Config = (*Config)(nil)

// Config defines the configuration for the processor.
type Config struct {
// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
MaxStaleness time.Duration `mapstructure:"max_staleness"`
// Interval is the time
Interval time.Duration `mapstructure:"interval"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.Interval <= 0 {
return ErrInvalidIntervalValue
}

return nil
}
5 changes: 4 additions & 1 deletion processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -23,7 +24,9 @@ func NewFactory() processor.Factory {
}

func createDefaultConfig() component.Config {
return &Config{}
return &Config{
Interval: 60 * time.Second,
}
}

func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) {
Expand Down
14 changes: 13 additions & 1 deletion processor/intervalprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/inter
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.99.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.99.0
go.opentelemetry.io/collector/confmap v0.99.0
Expand All @@ -17,7 +20,7 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -32,6 +35,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -52,3 +56,11 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
4 changes: 2 additions & 2 deletions processor/intervalprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions processor/intervalprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type DataPointSlice[DP DataPoint[DP]] interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these types are only locally used once in aggregateDataPoints, doesn't need an extra package, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that package has gotten much much smaller since the original design. That said, I'm working on a PR for another component, and needed the same interfaces. So I'm potentially going to put them in internal/exp/metrics/streams. See:

RichieSams@d16a215#diff-45d5f87d8148e341ebb68b4125ebe4296ccfbb1a9b63a45009358e7a0b617d26

Thoughts?

Len() int
At(i int) DP
AppendEmpty() DP
}

type DataPoint[Self any] interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
}
2 changes: 1 addition & 1 deletion processor/intervalprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ status:
distributions: []
warnings: [Statefulness]
codeowners:
active: [RichieSams]
active: [RichieSams, sh0rez]
tests:
config:
Loading
Loading