Skip to content

Commit

Permalink
Remove old receiver factories and receiver base factory
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent 30c3c34 commit 34c08f9
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## 🛑 Breaking changes 🛑

- Remove `reconnection_delay` from OpenCensus exporter #1516.
- Remove old receiver factories and receiver base factory $1583.

## v0.8.0 Beta

Expand Down
6 changes: 5 additions & 1 deletion component/componenttest/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ExampleReceiver struct {
type ExampleReceiverFactory struct {
}

var _ component.ReceiverFactory = (*ExampleReceiverFactory)(nil)

// Type gets the type of the Receiver config created by this factory.
func (f *ExampleReceiverFactory) Type() configmodels.Type {
return "examplereceiver"
Expand Down Expand Up @@ -159,7 +161,7 @@ func (erp *ExampleReceiverProducer) Shutdown(context.Context) error {
}

// This is the map of already created example receivers for particular configurations.
// We maintain this map because the ReceiverFactoryBase is asked trace and metric receivers separately
// We maintain this map because the ReceiverFactory is asked trace and metric receivers separately
// when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
var exampleReceivers = map[configmodels.Receiver]*ExampleReceiverProducer{}
Expand All @@ -181,6 +183,8 @@ type MultiProtoReceiverOneCfg struct {
type MultiProtoReceiverFactory struct {
}

var _ component.ReceiverFactory = (*MultiProtoReceiverFactory)(nil)

// Type gets the type of the Receiver config created by this factory.
func (f *MultiProtoReceiverFactory) Type() configmodels.Type {
return "multireceiver"
Expand Down
6 changes: 3 additions & 3 deletions component/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// can be handled by the Config.
type Factories struct {
// Receivers maps receiver type names in the config to the respective factory.
Receivers map[configmodels.Type]ReceiverFactoryBase
Receivers map[configmodels.Type]ReceiverFactory

// Processors maps processor type names in the config to the respective factory.
Processors map[configmodels.Type]ProcessorFactoryBase
Expand All @@ -39,8 +39,8 @@ type Factories struct {
// MakeReceiverFactoryMap takes a list of receiver factories and returns a map
// with factory type as keys. It returns a non-nil error when more than one factories
// have the same type.
func MakeReceiverFactoryMap(factories ...ReceiverFactoryBase) (map[configmodels.Type]ReceiverFactoryBase, error) {
fMap := map[configmodels.Type]ReceiverFactoryBase{}
func MakeReceiverFactoryMap(factories ...ReceiverFactory) (map[configmodels.Type]ReceiverFactory, error) {
fMap := map[configmodels.Type]ReceiverFactory{}
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return fMap, fmt.Errorf("duplicate receiver factory %q", f.Type())
Expand Down
53 changes: 20 additions & 33 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,6 @@ type LogsReceiver interface {
Receiver
}

// ReceiverFactoryBase defines the common functions for all receiver factories.
type ReceiverFactoryBase interface {
Factory

// CreateDefaultConfig creates the default configuration for the Receiver.
// This method can be called multiple times depending on the pipeline
// configuration and should not cause side-effects that prevent the creation
// of multiple instances of the Receiver.
// The object returned by this method needs to pass the checks implemented by
// 'configcheck.ValidateConfig'. It is recommended to have such check in the
// tests of any implementation of the Factory interface.
CreateDefaultConfig() configmodels.Receiver
}

// ReceiverFactoryOld can create TraceReceiver and MetricsReceiver.
type ReceiverFactoryOld interface {
ReceiverFactoryBase

// CreateTraceReceiver creates a trace receiver based on this config.
// If the receiver type does not support tracing or if the config is not valid
// error will be returned instead.
CreateTraceReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumerOld) (TraceReceiver, error)

// CreateMetricsReceiver creates a metrics receiver based on this config.
// If the receiver type does not support metrics or if the config is not valid
// error will be returned instead.
CreateMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumerOld) (MetricsReceiver, error)
}

// ReceiverCreateParams is passed to ReceiverFactory.Create* functions.
type ReceiverCreateParams struct {
// Logger that the factory can use during creation and can pass to the created
Expand All @@ -96,7 +65,16 @@ type ReceiverCreateParams struct {
// ReceiverFactory can create TraceReceiver and MetricsReceiver. This is the
// new factory type that can create new style receivers.
type ReceiverFactory interface {
ReceiverFactoryBase
Factory

// CreateDefaultConfig creates the default configuration for the Receiver.
// This method can be called multiple times depending on the pipeline
// configuration and should not cause side-effects that prevent the creation
// of multiple instances of the Receiver.
// The object returned by this method needs to pass the checks implemented by
// 'configcheck.ValidateConfig'. It is recommended to have such check in the
// tests of any implementation of the Factory interface.
CreateDefaultConfig() configmodels.Receiver

// CreateTraceReceiver creates a trace receiver based on this config.
// If the receiver type does not support tracing or if the config is not valid
Expand All @@ -113,7 +91,16 @@ type ReceiverFactory interface {

// LogsReceiverFactory can create a LogsReceiver.
type LogsReceiverFactory interface {
ReceiverFactoryBase
Factory

// CreateDefaultConfig creates the default configuration for the Receiver.
// This method can be called multiple times depending on the pipeline
// configuration and should not cause side-effects that prevent the creation
// of multiple instances of the Receiver.
// The object returned by this method needs to pass the checks implemented by
// 'configcheck.ValidateConfig'. It is recommended to have such check in the
// tests of any implementation of the Factory interface.
CreateDefaultConfig() configmodels.Receiver

// CreateLogsReceiver creates a log receiver based on this config.
// If the receiver type does not support the data type or if the config is not valid
Expand Down
15 changes: 7 additions & 8 deletions component/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -40,36 +39,36 @@ func (f *TestReceiverFactory) CreateDefaultConfig() configmodels.Receiver {
}

// CreateTraceReceiver creates a trace receiver based on this config.
func (f *TestReceiverFactory) CreateTraceReceiver(context.Context, *zap.Logger, configmodels.Receiver, consumer.TraceConsumerOld) (TraceReceiver, error) {
func (f *TestReceiverFactory) CreateTraceReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.TraceConsumer) (TraceReceiver, error) {
// Not used for this test, just return nil
return nil, nil
}

// CreateMetricsReceiver creates a metrics receiver based on this config.
func (f *TestReceiverFactory) CreateMetricsReceiver(context.Context, *zap.Logger, configmodels.Receiver, consumer.MetricsConsumerOld) (MetricsReceiver, error) {
func (f *TestReceiverFactory) CreateMetricsReceiver(context.Context, ReceiverCreateParams, configmodels.Receiver, consumer.MetricsConsumer) (MetricsReceiver, error) {
// Not used for this test, just return nil
return nil, nil
}

func TestBuildReceivers(t *testing.T) {
type testCase struct {
in []ReceiverFactoryBase
out map[configmodels.Type]ReceiverFactoryBase
in []ReceiverFactory
out map[configmodels.Type]ReceiverFactory
}

testCases := []testCase{
{
in: []ReceiverFactoryBase{
in: []ReceiverFactory{
&TestReceiverFactory{"e1"},
&TestReceiverFactory{"e2"},
},
out: map[configmodels.Type]ReceiverFactoryBase{
out: map[configmodels.Type]ReceiverFactory{
"e1": &TestReceiverFactory{"e1"},
"e2": &TestReceiverFactory{"e2"},
},
},
{
in: []ReceiverFactoryBase{
in: []ReceiverFactory{
&TestReceiverFactory{"e1"},
&TestReceiverFactory{"e1"},
},
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func loadService(v *viper.Viper) (configmodels.Service, error) {
}

// LoadReceiver loads a receiver config from componentConfig using the provided factories.
func LoadReceiver(componentConfig *viper.Viper, typeStr configmodels.Type, fullName string, factory component.ReceiverFactoryBase) (configmodels.Receiver, error) {
func LoadReceiver(componentConfig *viper.Viper, typeStr configmodels.Type, fullName string, factory component.ReceiverFactory) (configmodels.Receiver, error) {
// Create the default config for this receiver.
receiverCfg := factory.CreateDefaultConfig()
receiverCfg.SetType(typeStr)
Expand All @@ -326,7 +326,7 @@ func LoadReceiver(componentConfig *viper.Viper, typeStr configmodels.Type, fullN
return receiverCfg, nil
}

func loadReceivers(v *viper.Viper, factories map[configmodels.Type]component.ReceiverFactoryBase) (configmodels.Receivers, error) {
func loadReceivers(v *viper.Viper, factories map[configmodels.Type]component.ReceiverFactory) (configmodels.Receivers, error) {
// Get the list of all "receivers" sub vipers from config source.
receiversConfig := ViperSub(v, receiversKeyName)
expandEnvConfig(receiversConfig)
Expand Down
2 changes: 1 addition & 1 deletion service/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createExampleFactories() component.Factories {
exampleExporterFactory := &componenttest.ExampleExporterFactory{}

factories := component.Factories{
Receivers: map[configmodels.Type]component.ReceiverFactoryBase{
Receivers: map[configmodels.Type]component.ReceiverFactory{
exampleReceiverFactory.Type(): exampleReceiverFactory,
},
Processors: map[configmodels.Type]component.ProcessorFactoryBase{
Expand Down
78 changes: 24 additions & 54 deletions service/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ type ReceiversBuilder struct {
logger *zap.Logger
config *configmodels.Config
builtPipelines BuiltPipelines
factories map[configmodels.Type]component.ReceiverFactoryBase
factories map[configmodels.Type]component.ReceiverFactory
}

// NewReceiversBuilder creates a new ReceiversBuilder. Call BuildProcessors() on the returned value.
func NewReceiversBuilder(
logger *zap.Logger,
config *configmodels.Config,
builtPipelines BuiltPipelines,
factories map[configmodels.Type]component.ReceiverFactoryBase,
factories map[configmodels.Type]component.ReceiverFactory,
) *ReceiversBuilder {
return &ReceiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), config, builtPipelines, factories}
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver)

func (rb *ReceiversBuilder) attachReceiverToPipelines(
logger *zap.Logger,
factory component.ReceiverFactoryBase,
factory component.ReceiverFactory,
dataType configmodels.DataType,
config configmodels.Receiver,
rcv *builtReceiver,
Expand Down Expand Up @@ -351,82 +351,52 @@ func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.LogsConsumer {
// and type of the next consumer.
func createTraceReceiver(
ctx context.Context,
factoryBase component.ReceiverFactoryBase,
factory component.ReceiverFactory,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumerBase,
) (component.TraceReceiver, error) {
if factory, ok := factoryBase.(component.ReceiverFactory); ok {
creationParams := component.ReceiverCreateParams{Logger: logger}

// If both receiver and consumer are of the new type (can manipulate on internal data structure),
// use ProcessorFactory.CreateTraceReceiver.
if nextConsumer, ok := nextConsumer.(consumer.TraceConsumer); ok {
return factory.CreateTraceReceiver(ctx, creationParams, cfg, nextConsumer)
}

// If receiver is of the new type, but downstream consumer is of the old type,
// use internalToOCTraceConverter compatibility shim.
traceConverter := converter.NewInternalToOCTraceConverter(nextConsumer.(consumer.TraceConsumerOld))
return factory.CreateTraceReceiver(ctx, creationParams, cfg, traceConverter)
}

factoryOld := factoryBase.(component.ReceiverFactoryOld)
creationParams := component.ReceiverCreateParams{Logger: logger}

// If both receiver and consumer are of the old type (can manipulate on OC traces only),
// use Factory.CreateTraceReceiver.
if nextConsumer, ok := nextConsumer.(consumer.TraceConsumerOld); ok {
return factoryOld.CreateTraceReceiver(ctx, logger, cfg, nextConsumer)
// If both receiver and consumer are of the new type (can manipulate on internal data structure),
// use ProcessorFactory.CreateTraceReceiver.
if nextConsumer, ok := nextConsumer.(consumer.TraceConsumer); ok {
return factory.CreateTraceReceiver(ctx, creationParams, cfg, nextConsumer)
}

// If receiver is of the old type, but downstream consumer is of the new type,
// use NewInternalToOCTraceConverter compatibility shim to convert traces from internal format to OC.
traceConverter := converter.NewOCToInternalTraceConverter(nextConsumer.(consumer.TraceConsumer))
return factoryOld.CreateTraceReceiver(ctx, logger, cfg, traceConverter)
// If receiver is of the new type, but downstream consumer is of the old type,
// use internalToOCTraceConverter compatibility shim.
traceConverter := converter.NewInternalToOCTraceConverter(nextConsumer.(consumer.TraceConsumerOld))
return factory.CreateTraceReceiver(ctx, creationParams, cfg, traceConverter)
}

// createMetricsReceiver is a helper function that creates metric receiver based
// on the current receiver type and type of the next consumer.
func createMetricsReceiver(
ctx context.Context,
factoryBase component.ReceiverFactoryBase,
factory component.ReceiverFactory,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumerBase,
) (component.MetricsReceiver, error) {
if factory, ok := factoryBase.(component.ReceiverFactory); ok {
creationParams := component.ReceiverCreateParams{Logger: logger}

// If both receiver and consumer are of the new type (can manipulate on internal data structure),
// use ProcessorFactory.CreateMetricsReceiver.
if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumer); ok {
return factory.CreateMetricsReceiver(ctx, creationParams, cfg, nextConsumer)
}

// If receiver is of the new type, but downstream consumer is of the old type,
// use internalToOCMetricsConverter compatibility shim.
metricsConverter := converter.NewInternalToOCMetricsConverter(nextConsumer.(consumer.MetricsConsumerOld))
return factory.CreateMetricsReceiver(ctx, creationParams, cfg, metricsConverter)
}

factoryOld := factoryBase.(component.ReceiverFactoryOld)
creationParams := component.ReceiverCreateParams{Logger: logger}

// If both receiver and consumer are of the old type (can manipulate on OC metrics only),
// use Factory.CreateMetricsReceiver.
if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumerOld); ok {
return factoryOld.CreateMetricsReceiver(context.Background(), logger, cfg, nextConsumer)
// If both receiver and consumer are of the new type (can manipulate on internal data structure),
// use ProcessorFactory.CreateMetricsReceiver.
if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumer); ok {
return factory.CreateMetricsReceiver(ctx, creationParams, cfg, nextConsumer)
}

// If receiver is of the old type, but downstream consumer is of the new type,
// use NewInternalToOCMetricsConverter compatibility shim to convert metrics from internal format to OC.
metricsConverter := converter.NewOCToInternalMetricsConverter(nextConsumer.(consumer.MetricsConsumer))
return factoryOld.CreateMetricsReceiver(context.Background(), logger, cfg, metricsConverter)
// If receiver is of the new type, but downstream consumer is of the old type,
// use internalToOCMetricsConverter compatibility shim.
metricsConverter := converter.NewInternalToOCMetricsConverter(nextConsumer.(consumer.MetricsConsumerOld))
return factory.CreateMetricsReceiver(ctx, creationParams, cfg, metricsConverter)
}

// createLogsReceiver creates a log receiver using given factory and next consumer.
func createLogsReceiver(
ctx context.Context,
factoryBase component.ReceiverFactoryBase,
factoryBase component.Factory,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.LogsConsumer,
Expand Down
15 changes: 10 additions & 5 deletions service/builder/receivers_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,20 @@ func (b *badReceiverFactory) CreateDefaultConfig() configmodels.Receiver {
}

func (b *badReceiverFactory) CreateTraceReceiver(
_ context.Context,
_ *zap.Logger,
_ configmodels.Receiver,
_ consumer.TraceConsumerOld,
context.Context,
component.ReceiverCreateParams,
configmodels.Receiver,
consumer.TraceConsumer,
) (component.TraceReceiver, error) {
return nil, nil
}

func (b *badReceiverFactory) CreateMetricsReceiver(context.Context, *zap.Logger, configmodels.Receiver, consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {
func (b *badReceiverFactory) CreateMetricsReceiver(
context.Context,
component.ReceiverCreateParams,
configmodels.Receiver,
consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
return nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func TestApplication_GetFactory(t *testing.T) {
exampleExtensionFactory := &componenttest.ExampleExtensionFactory{}

factories := component.Factories{
Receivers: map[configmodels.Type]component.ReceiverFactoryBase{
Receivers: map[configmodels.Type]component.ReceiverFactory{
exampleReceiverFactory.Type(): exampleReceiverFactory,
},
Processors: map[configmodels.Type]component.ProcessorFactoryBase{
Expand Down Expand Up @@ -402,7 +402,7 @@ func createExampleApplication(t *testing.T) *Application {
exampleExporterFactory := &componenttest.ExampleExporterFactory{}
exampleExtensionFactory := &componenttest.ExampleExtensionFactory{}
factories := component.Factories{
Receivers: map[configmodels.Type]component.ReceiverFactoryBase{
Receivers: map[configmodels.Type]component.ReceiverFactory{
exampleReceiverFactory.Type(): exampleReceiverFactory,
},
Processors: map[configmodels.Type]component.ProcessorFactoryBase{
Expand Down

0 comments on commit 34c08f9

Please sign in to comment.