Skip to content

Commit

Permalink
[processor/logdedup] Add aggregated log record histogram metric (#34581)
Browse files Browse the repository at this point in the history
**Description:** 

Adds a aggregated log histogram metric. Updates tests to use helper
functions for creating processor and component settings.

**Link to tracking Issue:**

- Closes #34579

**Testing:**

**Documentation:**
  • Loading branch information
MikeGoldsmith committed Aug 12, 2024
1 parent 2515c52 commit 817d97c
Show file tree
Hide file tree
Showing 14 changed files with 330 additions and 38 deletions.
27 changes: 27 additions & 0 deletions .chloggen/logdedup-add-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logdedupprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a histogram metric to record the number of aggregated log records.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34579]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 9 additions & 2 deletions processor/logdedupprocessor/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package logdedupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor"

import (
"context"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor/internal/metadata"
)

// Attributes names for first and last observed timestamps
Expand All @@ -26,19 +28,21 @@ type logAggregator struct {
resources map[uint64]*resourceAggregator
logCountAttribute string
timezone *time.Location
telemetryBuilder *metadata.TelemetryBuilder
}

// newLogAggregator creates a new LogCounter.
func newLogAggregator(logCountAttribute string, timezone *time.Location) *logAggregator {
func newLogAggregator(logCountAttribute string, timezone *time.Location, telemetryBuilder *metadata.TelemetryBuilder) *logAggregator {
return &logAggregator{
resources: make(map[uint64]*resourceAggregator),
logCountAttribute: logCountAttribute,
timezone: timezone,
telemetryBuilder: telemetryBuilder,
}
}

// Export exports the counter as a Logs
func (l *logAggregator) Export() plog.Logs {
func (l *logAggregator) Export(ctx context.Context) plog.Logs {
logs := plog.NewLogs()

for _, resourceAggregator := range l.resources {
Expand All @@ -50,6 +54,9 @@ func (l *logAggregator) Export() plog.Logs {
scopeAggregator.scope.CopyTo(sl.Scope())

for _, logAggregator := range scopeAggregator.logCounters {
// Record aggregated logs records
l.telemetryBuilder.DedupProcessorAggregatedLogs.Record(ctx, logAggregator.count)

lr := sl.LogRecords().AppendEmpty()
logAggregator.logRecord.CopyTo(lr)

Expand Down
24 changes: 19 additions & 5 deletions processor/logdedupprocessor/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
package logdedupprocessor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor/internal/metadata"
)

func Test_newLogAggregator(t *testing.T) {
cfg := createDefaultConfig().(*Config)
aggregator := newLogAggregator(cfg.LogCountAttribute, time.UTC)
telemetryBuilder, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

aggregator := newLogAggregator(cfg.LogCountAttribute, time.UTC, telemetryBuilder)
require.Equal(t, cfg.LogCountAttribute, aggregator.logCountAttribute)
require.Equal(t, time.UTC, aggregator.timezone)
require.NotNil(t, aggregator.resources)
Expand All @@ -34,8 +40,11 @@ func Test_logAggregatorAdd(t *testing.T) {
return firstExpectedTimestamp
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

// Setup aggregator
aggregator := newLogAggregator("log_count", time.UTC)
aggregator := newLogAggregator("log_count", time.UTC, telemetryBuilder)
logRecord := plog.NewLogRecord()

resource := pcommon.NewResource()
Expand Down Expand Up @@ -82,7 +91,10 @@ func Test_logAggregatorAdd(t *testing.T) {
}

func Test_logAggregatorReset(t *testing.T) {
aggregator := newLogAggregator("log_count", time.UTC)
telemetryBuilder, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

aggregator := newLogAggregator("log_count", time.UTC, telemetryBuilder)
for i := 0; i < 2; i++ {
resource := pcommon.NewResource()
resource.Attributes().PutInt("i", int64(i))
Expand Down Expand Up @@ -114,8 +126,10 @@ func Test_logAggregatorExport(t *testing.T) {
}

// Setup aggregator
telemetryBuilder, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

aggregator := newLogAggregator(defaultLogCountAttribute, location)
aggregator := newLogAggregator(defaultLogCountAttribute, location, telemetryBuilder)
resource := pcommon.NewResource()
resource.Attributes().PutStr("one", "two")
expectedHash := pdatautil.MapHash(resource.Attributes())
Expand All @@ -127,7 +141,7 @@ func Test_logAggregatorExport(t *testing.T) {
// Add logRecord
aggregator.Add(resource, scope, logRecord)

exportedLogs := aggregator.Export()
exportedLogs := aggregator.Export(context.Background())
require.Equal(t, 1, exportedLogs.LogRecordCount())
require.Equal(t, 1, exportedLogs.ResourceLogs().Len())

Expand Down
15 changes: 15 additions & 0 deletions processor/logdedupprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# logdedup

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_dedup_processor_aggregated_logs

Number of log records that were aggregated together.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {records} | Histogram | Int |
4 changes: 2 additions & 2 deletions processor/logdedupprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ func NewFactory() processor.Factory {
}

// createLogsProcessor creates a log processor.
func createLogsProcessor(_ context.Context, params processor.Settings, cfg component.Config, consumer consumer.Logs) (processor.Logs, error) {
func createLogsProcessor(_ context.Context, settings processor.Settings, cfg component.Config, consumer consumer.Logs) (processor.Logs, error) {
processorCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("invalid config type: %+v", cfg)
}

return newProcessor(processorCfg, consumer, params.Logger)
return newProcessor(processorCfg, consumer, settings)
}
4 changes: 2 additions & 2 deletions processor/logdedupprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor/internal/metadata"
)
Expand Down Expand Up @@ -42,7 +42,7 @@ func TestCreateLogsProcessor(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
f := NewFactory()
p, err := f.CreateLogsProcessor(context.Background(), processor.Settings{}, tc.cfg, nil)
p, err := f.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), tc.cfg, nil)
if tc.expectedErr == "" {
require.NoError(t, err)
require.IsType(t, &logDedupProcessor{}, p)
Expand Down
76 changes: 76 additions & 0 deletions processor/logdedupprocessor/generated_component_telemetry_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions processor/logdedupprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.106.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/config/configtelemetry v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/confmap v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/consumer v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/consumer/consumertest v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/pdata v1.12.1-0.20240809191011-ef07ea073562
go.opentelemetry.io/collector/processor v0.106.2-0.20240809191011-ef07ea073562
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand Down Expand Up @@ -43,18 +47,14 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.106.2-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.2-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/collector/featuregate v1.12.1-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.106.2-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.106.2-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.106.2-0.20240809191011-ef07ea073562 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.22.0 // indirect
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 817d97c

Please sign in to comment.