Skip to content

Commit

Permalink
Adopt usage of split pdata in core
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Mar 29, 2022
1 parent e3bc29d commit b59c1ab
Show file tree
Hide file tree
Showing 85 changed files with 673 additions and 602 deletions.
4 changes: 2 additions & 2 deletions client/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func Example_receiver() {
// Your receiver get a next consumer when it's constructed
var next consumer.Traces

// You'll convert the incoming data into pipeline data
td := pdata.NewTraces()
td := traces.New()

// You probably have a context with client metadata from your listener or
// scraper
Expand Down
28 changes: 15 additions & 13 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestNewNopExporterFactory(t *testing.T) {
Expand All @@ -32,21 +34,21 @@ func TestNewNopExporterFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(config.NewComponentID("nop"))}, cfg)

traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
te, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))
assert.NoError(t, te.Start(context.Background(), NewNopHost()))
assert.NoError(t, te.ConsumeTraces(context.Background(), traces.New()))
assert.NoError(t, te.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
me, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))
assert.NoError(t, me.Start(context.Background(), NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), metrics.New()))
assert.NoError(t, me.Shutdown(context.Background()))

logs, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
le, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
assert.NoError(t, le.Start(context.Background(), NewNopHost()))
assert.NoError(t, le.ConsumeLogs(context.Background(), logs.New()))
assert.NoError(t, le.Shutdown(context.Background()))
}
34 changes: 18 additions & 16 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestNewNopProcessorFactory(t *testing.T) {
Expand All @@ -34,24 +36,24 @@ func TestNewNopProcessorFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop"))}, cfg)

traces, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
tp, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, tp.Capabilities())
assert.NoError(t, tp.Start(context.Background(), NewNopHost()))
assert.NoError(t, tp.ConsumeTraces(context.Background(), traces.New()))
assert.NoError(t, tp.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
mp, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, mp.Capabilities())
assert.NoError(t, mp.Start(context.Background(), NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), metrics.New()))
assert.NoError(t, mp.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
lp, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, lp.Capabilities())
assert.NoError(t, lp.Start(context.Background(), NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), logs.New()))
assert.NoError(t, lp.Shutdown(context.Background()))
}
6 changes: 3 additions & 3 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Receiver interface {
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
//
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
// For example it could be Zipkin data source which translates Zipkin spans into traces.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -76,7 +76,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from any format to the collector's internal metrics format.
// MetricsReceiver feeds a consumer.Metrics with data.
//
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
// For example it could be Prometheus data source which translates Prometheus metrics into metrics.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand All @@ -85,7 +85,7 @@ type MetricsReceiver interface {
// Its purpose is to translate data from any format to the collector's internal logs data format.
// LogsReceiver feeds a consumer.Logs with data.
//
// For example a LogsReceiver can read syslogs and convert them into pdata.Logs.
// For example a LogsReceiver can read syslogs and convert them into logs.Logs.
type LogsReceiver interface {
Receiver
}
Expand Down
16 changes: 9 additions & 7 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func BenchmarkCompressors(b *testing.B) {
Expand Down Expand Up @@ -106,27 +108,27 @@ type marshaler interface {
}

type logMarshaler struct {
pdata.LogsMarshaler
logs.Marshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
return m.MarshalLogs(e.(logs.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
traces.Marshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
return m.MarshalTraces(e.(traces.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
metrics.Marshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
return m.MarshalMetrics(e.(metrics.Metrics))
}

func setupTestPayloads() []testPayload {
Expand Down
22 changes: 12 additions & 10 deletions consumer/consumererror/signalerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

import (
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

// Traces is an error that may carry associated Trace data for a subset of received data
// that failed to be processed or sent.
type Traces struct {
error
failed pdata.Traces
failed traces.Traces
}

// NewTraces creates a Traces that can encapsulate received data that failed to be processed or sent.
func NewTraces(err error, failed pdata.Traces) error {
func NewTraces(err error, failed traces.Traces) error {
return Traces{
error: err,
failed: failed,
}
}

// GetTraces returns failed traces from the associated error.
func (err Traces) GetTraces() pdata.Traces {
func (err Traces) GetTraces() traces.Traces {
return err.failed
}

Expand All @@ -47,19 +49,19 @@ func (err Traces) Unwrap() error {
// that failed to be processed or sent.
type Logs struct {
error
failed pdata.Logs
failed logs.Logs
}

// NewLogs creates a Logs that can encapsulate received data that failed to be processed or sent.
func NewLogs(err error, failed pdata.Logs) error {
func NewLogs(err error, failed logs.Logs) error {
return Logs{
error: err,
failed: failed,
}
}

// GetLogs returns failed logs from the associated error.
func (err Logs) GetLogs() pdata.Logs {
func (err Logs) GetLogs() logs.Logs {
return err.failed
}

Expand All @@ -72,19 +74,19 @@ func (err Logs) Unwrap() error {
// that failed to be processed or sent.
type Metrics struct {
error
failed pdata.Metrics
failed metrics.Metrics
}

// NewMetrics creates a Metrics that can encapsulate received data that failed to be processed or sent.
func NewMetrics(err error, failed pdata.Metrics) error {
func NewMetrics(err error, failed metrics.Metrics) error {
return Metrics{
error: err,
failed: failed,
}
}

// GetMetrics returns failed metrics from the associated error.
func (err Metrics) GetMetrics() pdata.Metrics {
func (err Metrics) GetMetrics() metrics.Metrics {
return err.failed
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

// Consumer is a convenience interface that implements all consumer interfaces.
Expand All @@ -29,11 +31,11 @@ type Consumer interface {
// Capabilities to implement the base consumer functionality.
Capabilities() consumer.Capabilities
// ConsumeTraces to implement the consumer.Traces.
ConsumeTraces(context.Context, pdata.Traces) error
ConsumeTraces(context.Context, traces.Traces) error
// ConsumeMetrics to implement the consumer.Metrics.
ConsumeMetrics(context.Context, pdata.Metrics) error
ConsumeMetrics(context.Context, metrics.Metrics) error
// ConsumeLogs to implement the consumer.Logs.
ConsumeLogs(context.Context, pdata.Logs) error
ConsumeLogs(context.Context, logs.Logs) error
unexported()
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

type errConsumer struct {
Expand All @@ -27,15 +29,15 @@ type errConsumer struct {

func (er *errConsumer) unexported() {}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (er *errConsumer) ConsumeTraces(context.Context, traces.Traces) error {
return er.err
}

func (er *errConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (er *errConsumer) ConsumeMetrics(context.Context, metrics.Metrics) error {
return er.err
}

func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (er *errConsumer) ConsumeLogs(context.Context, logs.Logs) error {
return er.err
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestErr(t *testing.T) {
err := errors.New("my error")
ec := NewErr(err)
require.NotNil(t, ec)
assert.NotPanics(t, ec.unexported)
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), logs.New()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), metrics.New()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), traces.New()))
}
10 changes: 6 additions & 4 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

var (
Expand All @@ -30,15 +32,15 @@ type nopConsumer struct {

func (nc *nopConsumer) unexported() {}

func (nc *nopConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (nc *nopConsumer) ConsumeTraces(context.Context, traces.Traces) error {
return nil
}

func (nc *nopConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (nc *nopConsumer) ConsumeMetrics(context.Context, metrics.Metrics) error {
return nil
}

func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (nc *nopConsumer) ConsumeLogs(context.Context, logs.Logs) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/pdata/logs"
"go.opentelemetry.io/collector/model/pdata/metrics"
"go.opentelemetry.io/collector/model/pdata/traces"
)

func TestNop(t *testing.T) {
nc := NewNop()
require.NotNil(t, nc)
assert.NotPanics(t, nc.unexported)
assert.NoError(t, nc.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, nc.ConsumeLogs(context.Background(), logs.New()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), metrics.New()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), traces.New()))
}
Loading

0 comments on commit b59c1ab

Please sign in to comment.