Skip to content

Commit

Permalink
Split pdata package by telemetry signal type
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Mar 30, 2022
1 parent 699a81b commit c8df699
Show file tree
Hide file tree
Showing 126 changed files with 1,832 additions and 752 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
instance for consistency with other OneOf field accessors (#5035)
- Update OTLP to v0.15.0 (#5064)
- Adding support for transition from older versions of OTLP to OTLP v0.15.0 (#5085)
- Split pdata package separated by telemetry signal type (#5087)

### 🧰 Bug fixes 🧰

Expand Down
4 changes: 2 additions & 2 deletions client/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/ptrace"
)

func Example_receiver() {
// Your receiver get a next consumer when it's constructed
var next consumer.Traces

// You'll convert the incoming data into pipeline data
td := pdata.NewTraces()
td := ptrace.NewTraces()

// You probably have a context with client metadata from your listener or
// scraper
Expand Down
10 changes: 6 additions & 4 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

func TestNewNopExporterFactory(t *testing.T) {
Expand All @@ -35,18 +37,18 @@ func TestNewNopExporterFactory(t *testing.T) {
traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
10 changes: 6 additions & 4 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

func TestNewNopProcessorFactory(t *testing.T) {
Expand All @@ -38,20 +40,20 @@ func TestNewNopProcessorFactory(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))
}
6 changes: 3 additions & 3 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Receiver interface {
// Its purpose is to translate data from any format to the collector's internal trace format.
// TracesReceiver feeds a consumer.Traces with data.
//
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
// For example it could be Zipkin data source which translates Zipkin spans into ptrace.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -76,7 +76,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from any format to the collector's internal metrics format.
// MetricsReceiver feeds a consumer.Metrics with data.
//
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
// For example it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand All @@ -85,7 +85,7 @@ type MetricsReceiver interface {
// Its purpose is to translate data from any format to the collector's internal logs data format.
// LogsReceiver feeds a consumer.Logs with data.
//
// For example a LogsReceiver can read syslogs and convert them into pdata.Logs.
// For example a LogsReceiver can read syslogs and convert them into plog.Logs.
type LogsReceiver interface {
Receiver
}
Expand Down
16 changes: 9 additions & 7 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

func BenchmarkCompressors(b *testing.B) {
Expand Down Expand Up @@ -106,27 +108,27 @@ type marshaler interface {
}

type logMarshaler struct {
pdata.LogsMarshaler
plog.LogsMarshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
return m.MarshalLogs(e.(plog.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
ptrace.TracesMarshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
return m.MarshalTraces(e.(ptrace.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
pmetric.MetricsMarshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
return m.MarshalMetrics(e.(pmetric.Metrics))
}

func setupTestPayloads() []testPayload {
Expand Down
22 changes: 12 additions & 10 deletions consumer/consumererror/signalerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

import (
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

// Traces is an error that may carry associated Trace data for a subset of received data
// that failed to be processed or sent.
type Traces struct {
error
failed pdata.Traces
failed ptrace.Traces
}

// NewTraces creates a Traces that can encapsulate received data that failed to be processed or sent.
func NewTraces(err error, failed pdata.Traces) error {
func NewTraces(err error, failed ptrace.Traces) error {
return Traces{
error: err,
failed: failed,
}
}

// GetTraces returns failed traces from the associated error.
func (err Traces) GetTraces() pdata.Traces {
func (err Traces) GetTraces() ptrace.Traces {
return err.failed
}

Expand All @@ -47,19 +49,19 @@ func (err Traces) Unwrap() error {
// that failed to be processed or sent.
type Logs struct {
error
failed pdata.Logs
failed plog.Logs
}

// NewLogs creates a Logs that can encapsulate received data that failed to be processed or sent.
func NewLogs(err error, failed pdata.Logs) error {
func NewLogs(err error, failed plog.Logs) error {
return Logs{
error: err,
failed: failed,
}
}

// GetLogs returns failed logs from the associated error.
func (err Logs) GetLogs() pdata.Logs {
func (err Logs) GetLogs() plog.Logs {
return err.failed
}

Expand All @@ -72,19 +74,19 @@ func (err Logs) Unwrap() error {
// that failed to be processed or sent.
type Metrics struct {
error
failed pdata.Metrics
failed pmetric.Metrics
}

// NewMetrics creates a Metrics that can encapsulate received data that failed to be processed or sent.
func NewMetrics(err error, failed pdata.Metrics) error {
func NewMetrics(err error, failed pmetric.Metrics) error {
return Metrics{
error: err,
failed: failed,
}
}

// GetMetrics returns failed metrics from the associated error.
func (err Metrics) GetMetrics() pdata.Metrics {
func (err Metrics) GetMetrics() pmetric.Metrics {
return err.failed
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

// Consumer is a convenience interface that implements all consumer interfaces.
Expand All @@ -29,11 +31,11 @@ type Consumer interface {
// Capabilities to implement the base consumer functionality.
Capabilities() consumer.Capabilities
// ConsumeTraces to implement the consumer.Traces.
ConsumeTraces(context.Context, pdata.Traces) error
ConsumeTraces(context.Context, ptrace.Traces) error
// ConsumeMetrics to implement the consumer.Metrics.
ConsumeMetrics(context.Context, pdata.Metrics) error
ConsumeMetrics(context.Context, pmetric.Metrics) error
// ConsumeLogs to implement the consumer.Logs.
ConsumeLogs(context.Context, pdata.Logs) error
ConsumeLogs(context.Context, plog.Logs) error
unexported()
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

type errConsumer struct {
Expand All @@ -27,15 +29,15 @@ type errConsumer struct {

func (er *errConsumer) unexported() {}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (er *errConsumer) ConsumeTraces(context.Context, ptrace.Traces) error {
return er.err
}

func (er *errConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (er *errConsumer) ConsumeMetrics(context.Context, pmetric.Metrics) error {
return er.err
}

func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (er *errConsumer) ConsumeLogs(context.Context, plog.Logs) error {
return er.err
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

func TestErr(t *testing.T) {
err := errors.New("my error")
ec := NewErr(err)
require.NotNil(t, ec)
assert.NotPanics(t, ec.unexported)
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), ptrace.NewTraces()))
}
10 changes: 6 additions & 4 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

var (
Expand All @@ -30,15 +32,15 @@ type nopConsumer struct {

func (nc *nopConsumer) unexported() {}

func (nc *nopConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
func (nc *nopConsumer) ConsumeTraces(context.Context, ptrace.Traces) error {
return nil
}

func (nc *nopConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (nc *nopConsumer) ConsumeMetrics(context.Context, pmetric.Metrics) error {
return nil
}

func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
func (nc *nopConsumer) ConsumeLogs(context.Context, plog.Logs) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions consumer/consumertest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/model/plog"
"go.opentelemetry.io/collector/model/pmetric"
"go.opentelemetry.io/collector/model/ptrace"
)

func TestNop(t *testing.T) {
nc := NewNop()
require.NotNil(t, nc)
assert.NotPanics(t, nc.unexported)
assert.NoError(t, nc.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, nc.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), ptrace.NewTraces()))
}
Loading

0 comments on commit c8df699

Please sign in to comment.