Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split pdata package by telemetry signal type #4918

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
- The `featuregates` were not configured from the "--feature-gates" flag on windows service (#5060)
- Fix Semantic Convention Schema URL definition for 1.5.0 and 1.6.1 versions (#5103)

### 💡 Enhancements 💡

- Create additional pdata packages separated by type for further split of pdata (#4918)

## v0.47.0 Beta

### 🛑 Breaking changes 🛑
Expand Down
4 changes: 2 additions & 2 deletions client/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func Example_receiver() {
// Your receiver get a next consumer when it's constructed
var next consumer.Traces

// You'll convert the incoming data into pipeline data
td := pdata.NewTraces()
td := traces.New()

// You probably have a context with client metadata from your listener or
// scraper
Expand Down
28 changes: 15 additions & 13 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestNewNopExporterFactory(t *testing.T) {
Expand All @@ -32,21 +34,21 @@ func TestNewNopExporterFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(config.NewComponentID("nop"))}, cfg)

traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
te, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))
assert.NoError(t, te.Start(context.Background(), NewNopHost()))
assert.NoError(t, te.ConsumeTraces(context.Background(), traces.New()))
assert.NoError(t, te.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
me, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))
assert.NoError(t, me.Start(context.Background(), NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), metrics.New()))
assert.NoError(t, me.Shutdown(context.Background()))

logs, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
le, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
assert.NoError(t, le.Start(context.Background(), NewNopHost()))
assert.NoError(t, le.ConsumeLogs(context.Background(), logs.New()))
assert.NoError(t, le.Shutdown(context.Background()))
}
34 changes: 18 additions & 16 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestNewNopProcessorFactory(t *testing.T) {
Expand All @@ -34,24 +36,24 @@ func TestNewNopProcessorFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop"))}, cfg)

traces, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
tp, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, tp.Capabilities())
assert.NoError(t, tp.Start(context.Background(), NewNopHost()))
assert.NoError(t, tp.ConsumeTraces(context.Background(), traces.New()))
assert.NoError(t, tp.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
mp, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, mp.Capabilities())
assert.NoError(t, mp.Start(context.Background(), NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), metrics.New()))
assert.NoError(t, mp.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
lp, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, lp.Capabilities())
assert.NoError(t, lp.Start(context.Background(), NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), logs.New()))
assert.NoError(t, lp.Shutdown(context.Background()))
}
6 changes: 3 additions & 3 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Receiver interface {
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
//
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
// For example it could be Zipkin data source which translates Zipkin spans into traces.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -76,7 +76,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from any format to the collector's internal metrics format.
// MetricsReceiver feeds a consumer.Metrics with data.
//
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
// For example it could be Prometheus data source which translates Prometheus metrics into metrics.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand All @@ -85,7 +85,7 @@ type MetricsReceiver interface {
// Its purpose is to translate data from any format to the collector's internal logs data format.
// LogsReceiver feeds a consumer.Logs with data.
//
// For example a LogsReceiver can read syslogs and convert them into pdata.Logs.
// For example a LogsReceiver can read syslogs and convert them into logs.Logs.
type LogsReceiver interface {
Receiver
}
Expand Down
16 changes: 9 additions & 7 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func BenchmarkCompressors(b *testing.B) {
Expand Down Expand Up @@ -106,27 +108,27 @@ type marshaler interface {
}

type logMarshaler struct {
pdata.LogsMarshaler
logs.Marshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
return m.MarshalLogs(e.(logs.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
traces.Marshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
return m.MarshalTraces(e.(traces.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
metrics.Marshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
return m.MarshalMetrics(e.(metrics.Metrics))
}

func setupTestPayloads() []testPayload {
Expand Down
22 changes: 12 additions & 10 deletions consumer/consumererror/signalerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

import (
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

// Traces is an error that may carry associated Trace data for a subset of received data
// that failed to be processed or sent.
type Traces struct {
error
failed pdata.Traces
failed traces.Traces
}

// NewTraces creates a Traces that can encapsulate received data that failed to be processed or sent.
func NewTraces(err error, failed pdata.Traces) error {
func NewTraces(err error, failed traces.Traces) error {
return Traces{
error: err,
failed: failed,
}
}

// GetTraces returns failed traces from the associated error.
func (err Traces) GetTraces() pdata.Traces {
func (err Traces) GetTraces() traces.Traces {
return err.failed
}

Expand All @@ -47,19 +49,19 @@ func (err Traces) Unwrap() error {
// that failed to be processed or sent.
type Logs struct {
error
failed pdata.Logs
failed logs.Logs
}

// NewLogs creates a Logs that can encapsulate received data that failed to be processed or sent.
func NewLogs(err error, failed pdata.Logs) error {
func NewLogs(err error, failed logs.Logs) error {
return Logs{
error: err,
failed: failed,
}
}

// GetLogs returns failed logs from the associated error.
func (err Logs) GetLogs() pdata.Logs {
func (err Logs) GetLogs() logs.Logs {
return err.failed
}

Expand All @@ -72,19 +74,19 @@ func (err Logs) Unwrap() error {
// that failed to be processed or sent.
type Metrics struct {
error
failed pdata.Metrics
failed metrics.Metrics
}

// NewMetrics creates a Metrics that can encapsulate received data that failed to be processed or sent.
func NewMetrics(err error, failed pdata.Metrics) error {
func NewMetrics(err error, failed metrics.Metrics) error {
return Metrics{
error: err,
failed: failed,
}
}

// GetMetrics returns failed metrics from the associated error.
func (err Metrics) GetMetrics() pdata.Metrics {
func (err Metrics) GetMetrics() metrics.Metrics {
return err.failed
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

// Consumer is a convenience interface that implements all consumer interfaces.
Expand All @@ -29,11 +31,11 @@ type Consumer interface {
// Capabilities to implement the base consumer functionality.
Capabilities() consumer.Capabilities
// ConsumeTraces to implement the consumer.Traces.
ConsumeTraces(context.Context, pdata.Traces) error
ConsumeTraces(context.Context, traces.Traces) error
// ConsumeMetrics to implement the consumer.Metrics.
ConsumeMetrics(context.Context, pdata.Metrics) error
ConsumeMetrics(context.Context, metrics.Metrics) error
// ConsumeLogs to implement the consumer.Logs.
ConsumeLogs(context.Context, pdata.Logs) error
ConsumeLogs(context.Context, logs.Logs) error
unexported()
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

type errConsumer struct {
Expand All @@ -27,15 +29,15 @@ type errConsumer struct {

func (er *errConsumer) unexported() {}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (er *errConsumer) ConsumeTraces(context.Context, traces.Traces) error {
return er.err
}

func (er *errConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (er *errConsumer) ConsumeMetrics(context.Context, metrics.Metrics) error {
return er.err
}

func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (er *errConsumer) ConsumeLogs(context.Context, logs.Logs) error {
return er.err
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestErr(t *testing.T) {
err := errors.New("my error")
ec := NewErr(err)
require.NotNil(t, ec)
assert.NotPanics(t, ec.unexported)
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), logs.New()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), metrics.New()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), traces.New()))
}
10 changes: 6 additions & 4 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

var (
Expand All @@ -30,15 +32,15 @@ type nopConsumer struct {

func (nc *nopConsumer) unexported() {}

func (nc *nopConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (nc *nopConsumer) ConsumeTraces(context.Context, traces.Traces) error {
return nil
}

func (nc *nopConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (nc *nopConsumer) ConsumeMetrics(context.Context, metrics.Metrics) error {
return nil
}

func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (nc *nopConsumer) ConsumeLogs(context.Context, logs.Logs) error {
return nil
}

Expand Down
Loading