Skip to content

Commit

Permalink
add consumer profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu committed Jun 25, 2024
1 parent 227fb82 commit 08d4ba8
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 48 deletions.
44 changes: 6 additions & 38 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,20 @@ package consumer // import "go.opentelemetry.io/collector/consumer"

import (
"errors"
)

// Capabilities describes the capabilities of a Processor.
type Capabilities struct {
// MutatesData is set to true if Consume* function of the
// processor modifies the input Traces, Logs or Metrics argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesData bool
}
"go.opentelemetry.io/collector/consumer/internal"
)

type baseConsumer interface {
Capabilities() Capabilities
}
type Capabilities = internal.Capabilities

var errNilFunc = errors.New("nil consumer func")

type baseImpl struct {
capabilities Capabilities
}

// Option to construct new consumers.
type Option func(*baseImpl)
type Option = internal.Option

// WithCapabilities overrides the default GetCapabilities function for a processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities Capabilities) Option {
return func(o *baseImpl) {
o.capabilities = capabilities
}
}

// Capabilities returns the capabilities of the component
func (bs baseImpl) Capabilities() Capabilities {
return bs.capabilities
}

func newBaseImpl(options ...Option) *baseImpl {
bs := &baseImpl{
capabilities: Capabilities{MutatesData: false},
return func(o *internal.BaseImpl) {
o.Cap = capabilities
}

for _, op := range options {
op(bs)
}

return bs
}
47 changes: 47 additions & 0 deletions consumer/consumerprofiles/profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumerprofiles // import "go.opentelemetry.io/collector/consumer/consumerprofiles"

import (
"context"
"errors"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/pprofile"
)

var errNilFunc = errors.New("nil consumer func")

// Profiles is an interface that receives pprofile.Profiles, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Profiles interface {
internal.BaseConsumer
// ConsumeProfiles receives pprofile.Profiles for consumption.
ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error
}

// ConsumeProfilesFunc is a helper function that is similar to ConsumeProfiles.
type ConsumeProfilesFunc func(ctx context.Context, td pprofile.Profiles) error

// ConsumeProfiles calls f(ctx, td).
func (f ConsumeProfilesFunc) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error {
return f(ctx, td)
}

type baseProfiles struct {
*internal.BaseImpl
ConsumeProfilesFunc
}

// NewProfiles returns a Profiles configured with the provided options.
func NewProfiles(consume ConsumeProfilesFunc, options ...consumer.Option) (Profiles, error) {
if consume == nil {
return nil, errNilFunc
}
return &baseProfiles{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeProfilesFunc: consume,
}, nil
}
51 changes: 51 additions & 0 deletions consumer/consumerprofiles/profiles_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumerprofiles

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pprofile"
)

func TestDefaultProfiles(t *testing.T) {
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return nil })
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities())
}

func TestNilFuncProfiles(t *testing.T) {
_, err := NewProfiles(nil)
assert.Equal(t, errNilFunc, err)
}

func TestWithCapabilitiesProfiles(t *testing.T) {
cp, err := NewProfiles(
func(context.Context, pprofile.Profiles) error { return nil },
consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities())
}

func TestConsumeProfiles(t *testing.T) {
consumeCalled := false
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { consumeCalled = true; return nil })
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.True(t, consumeCalled)
}

func TestConsumeProfiles_ReturnError(t *testing.T) {
want := errors.New("my_error")
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return want })
assert.NoError(t, err)
assert.Equal(t, want, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
}
2 changes: 1 addition & 1 deletion consumer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/pdata v1.10.0
go.opentelemetry.io/collector/pdata/pprofile v0.103.0
go.opentelemetry.io/collector/pdata/testdata v0.103.0
go.uber.org/goleak v1.3.0
)
Expand All @@ -16,7 +17,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.103.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
42 changes: 42 additions & 0 deletions consumer/internal/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/consumer/internal"

// Capabilities describes the capabilities of a Processor.
type Capabilities struct {
// MutatesData is set to true if Consume* function of the
// processor modifies the input Traces, Logs or Metrics argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesData bool
}

type BaseConsumer interface {
Capabilities() Capabilities
}

type BaseImpl struct {
Cap Capabilities
}

// Option to construct new consumers.
type Option func(*BaseImpl)

// Capabilities returns the capabilities of the component
func (bs BaseImpl) Capabilities() Capabilities {
return bs.Cap
}

func NewBaseImpl(options ...Option) *BaseImpl {
bs := &BaseImpl{
Cap: Capabilities{MutatesData: false},
}

for _, op := range options {
op(bs)
}

return bs
}
7 changes: 4 additions & 3 deletions consumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer"
import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/plog"
)

// Logs is an interface that receives plog.Logs, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Logs interface {
baseConsumer
internal.BaseConsumer
// ConsumeLogs receives plog.Logs for consumption.
ConsumeLogs(ctx context.Context, ld plog.Logs) error
}
Expand All @@ -26,7 +27,7 @@ func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
}

type baseLogs struct {
*baseImpl
*internal.BaseImpl
ConsumeLogsFunc
}

Expand All @@ -36,7 +37,7 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) {
return nil, errNilFunc
}
return &baseLogs{
baseImpl: newBaseImpl(options...),
BaseImpl: internal.NewBaseImpl(options...),
ConsumeLogsFunc: consume,
}, nil
}
7 changes: 4 additions & 3 deletions consumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer"
import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// Metrics is an interface that receives pmetric.Metrics, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Metrics interface {
baseConsumer
internal.BaseConsumer
// ConsumeMetrics receives pmetric.Metrics for consumption.
ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
}
Expand All @@ -26,7 +27,7 @@ func (f ConsumeMetricsFunc) ConsumeMetrics(ctx context.Context, md pmetric.Metri
}

type baseMetrics struct {
*baseImpl
*internal.BaseImpl
ConsumeMetricsFunc
}

Expand All @@ -36,7 +37,7 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error)
return nil, errNilFunc
}
return &baseMetrics{
baseImpl: newBaseImpl(options...),
BaseImpl: internal.NewBaseImpl(options...),
ConsumeMetricsFunc: consume,
}, nil
}
7 changes: 4 additions & 3 deletions consumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer"
import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Traces is an interface that receives ptrace.Traces, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Traces interface {
baseConsumer
internal.BaseConsumer
// ConsumeTraces receives ptrace.Traces for consumption.
ConsumeTraces(ctx context.Context, td ptrace.Traces) error
}
Expand All @@ -26,7 +27,7 @@ func (f ConsumeTracesFunc) ConsumeTraces(ctx context.Context, td ptrace.Traces)
}

type baseTraces struct {
*baseImpl
*internal.BaseImpl
ConsumeTracesFunc
}

Expand All @@ -36,7 +37,7 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) {
return nil, errNilFunc
}
return &baseTraces{
baseImpl: newBaseImpl(options...),
BaseImpl: internal.NewBaseImpl(options...),
ConsumeTracesFunc: consume,
}, nil
}

0 comments on commit 08d4ba8

Please sign in to comment.