Skip to content

Commit

Permalink
Ensure that all exporters are started/shutdown (#1943)
Browse files Browse the repository at this point in the history
* Ensure that all exporters are started/shutdown

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Add changelog entry

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Address review feedback

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 12, 2020
1 parent 3f96743 commit d495cf4
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Fix bug where the service does not correctly start/stop the log exporters (#1943)

## v0.12.0 Beta

## 🚀 New components 🚀
Expand Down
78 changes: 38 additions & 40 deletions service/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,15 @@ import (
// builtExporter is an exporter that is built based on a config. It can have
// a trace and/or a metrics consumer and have a shutdown function.
type builtExporter struct {
logger *zap.Logger
te component.TraceExporter
me component.MetricsExporter
le component.LogsExporter
logger *zap.Logger
expByDataType map[configmodels.DataType]component.Exporter
}

// Start the exporter.
func (exp *builtExporter) Start(ctx context.Context, host component.Host) error {
func (bexp *builtExporter) Start(ctx context.Context, host component.Host) error {
var errors []error
if exp.te != nil {
err := exp.te.Start(ctx, host)
if err != nil {
errors = append(errors, err)
}
}

if exp.me != nil {
err := exp.me.Start(ctx, host)
for _, exporter := range bexp.expByDataType {
err := exporter.Start(ctx, host)
if err != nil {
errors = append(errors, err)
}
Expand All @@ -56,29 +47,40 @@ func (exp *builtExporter) Start(ctx context.Context, host component.Host) error
}

// Shutdown the trace component and the metrics component of an exporter.
func (exp *builtExporter) Shutdown(ctx context.Context) error {
func (bexp *builtExporter) Shutdown(ctx context.Context) error {
var errors []error
if exp.te != nil {
if err := exp.te.Shutdown(ctx); err != nil {
errors = append(errors, err)
}
}

if exp.me != nil {
if err := exp.me.Shutdown(ctx); err != nil {
for _, exporter := range bexp.expByDataType {
err := exporter.Shutdown(ctx)
if err != nil {
errors = append(errors, err)
}
}

return componenterror.CombineErrors(errors)
}

func (exp *builtExporter) GetTraceExporter() component.TraceExporter {
return exp.te
func (bexp *builtExporter) getTraceExporter() component.TraceExporter {
exp := bexp.expByDataType[configmodels.TracesDataType]
if exp == nil {
return nil
}
return exp.(component.TraceExporter)
}

func (bexp *builtExporter) getMetricExporter() component.MetricsExporter {
exp := bexp.expByDataType[configmodels.MetricsDataType]
if exp == nil {
return nil
}
return exp.(component.MetricsExporter)
}

func (exp *builtExporter) GetMetricExporter() component.MetricsExporter {
return exp.me
func (bexp *builtExporter) getLogExporter() component.LogsExporter {
exp := bexp.expByDataType[configmodels.LogsDataType]
if exp == nil {
return nil
}
return exp.(component.LogsExporter)
}

// Exporters is a map of exporters created from exporter configs.
Expand Down Expand Up @@ -116,15 +118,11 @@ func (exps Exporters) ToMapByDataType() map[configmodels.DataType]map[configmode

exportersMap[configmodels.TracesDataType] = make(map[configmodels.Exporter]component.Exporter, len(exps))
exportersMap[configmodels.MetricsDataType] = make(map[configmodels.Exporter]component.Exporter, len(exps))
exportersMap[configmodels.LogsDataType] = make(map[configmodels.Exporter]component.Exporter, len(exps))

for cfg, exp := range exps {
te := exp.GetTraceExporter()
if te != nil {
exportersMap[configmodels.TracesDataType][cfg] = te
}
me := exp.GetMetricExporter()
if me != nil {
exportersMap[configmodels.MetricsDataType][cfg] = me
for cfg, bexp := range exps {
for t, exp := range bexp.expByDataType {
exportersMap[t][cfg] = exp
}
}

Expand Down Expand Up @@ -228,7 +226,8 @@ func (eb *ExportersBuilder) buildExporter(
}

exporter := &builtExporter{
logger: logger,
logger: logger,
expByDataType: make(map[configmodels.DataType]component.Exporter, 3),
}

inputDataTypes := exportersInputDataTypes[config]
Expand All @@ -243,7 +242,6 @@ func (eb *ExportersBuilder) buildExporter(
}

for dataType, requirement := range inputDataTypes {

switch dataType {
case configmodels.TracesDataType:
// Traces data type is required. Create a trace exporter based on config.
Expand All @@ -261,7 +259,7 @@ func (eb *ExportersBuilder) buildExporter(
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
}

exporter.te = te
exporter.expByDataType[configmodels.TracesDataType] = te

case configmodels.MetricsDataType:
// Metrics data type is required. Create a trace exporter based on config.
Expand All @@ -280,7 +278,7 @@ func (eb *ExportersBuilder) buildExporter(
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
}

exporter.me = me
exporter.expByDataType[configmodels.MetricsDataType] = me

case configmodels.LogsDataType:
le, err := factory.CreateLogsExporter(ctx, creationParams, config)
Expand All @@ -297,7 +295,7 @@ func (eb *ExportersBuilder) buildExporter(
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
}

exporter.le = le
exporter.expByDataType[configmodels.LogsDataType] = le

default:
// Could not create because this exporter does not support this data type.
Expand Down
42 changes: 28 additions & 14 deletions service/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func TestExportersBuilder_Build(t *testing.T) {

// Ensure exporter has its fields correctly populated.
require.NotNil(t, e1)
assert.NotNil(t, e1.te)
assert.Nil(t, e1.me)
assert.NotNil(t, e1.getTraceExporter())
assert.Nil(t, e1.getMetricExporter())
assert.Nil(t, e1.getLogExporter())

// Ensure it can be started.
assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost()))
Expand All @@ -98,8 +99,9 @@ func TestExportersBuilder_Build(t *testing.T) {
// Ensure exporter has its fields correctly populated, ie Trace Exporter and
// Metrics Exporter are nil.
require.NotNil(t, e1)
assert.Nil(t, e1.te)
assert.Nil(t, e1.me)
assert.Nil(t, e1.getTraceExporter())
assert.Nil(t, e1.getMetricExporter())
assert.Nil(t, e1.getLogExporter())

// TODO: once we have an exporter that supports metrics data type test it too.
}
Expand Down Expand Up @@ -138,9 +140,9 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {

// Ensure exporter has its fields correctly populated.
require.NotNil(t, e1)
assert.NotNil(t, e1.le)
assert.Nil(t, e1.te)
assert.Nil(t, e1.me)
assert.NotNil(t, e1.getLogExporter())
assert.Nil(t, e1.getTraceExporter())
assert.Nil(t, e1.getMetricExporter())

// Ensure it can be started.
err = exporters.StartAll(context.Background(), componenttest.NewNopHost())
Expand All @@ -163,46 +165,58 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {
// Ensure exporter has its fields correctly populated, ie Trace Exporter and
// Metrics Exporter are nil.
require.NotNil(t, e1)
assert.Nil(t, e1.te)
assert.Nil(t, e1.me)
assert.Nil(t, e1.le)
assert.Nil(t, e1.getTraceExporter())
assert.Nil(t, e1.getMetricExporter())
assert.Nil(t, e1.getLogExporter())
}

func TestExportersBuilder_StartAll(t *testing.T) {
exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{}
traceExporter := &componenttest.ExampleExporterConsumer{}
metricExporter := &componenttest.ExampleExporterConsumer{}
logsExporter := &componenttest.ExampleExporterConsumer{}
exporters[expCfg] = &builtExporter{
logger: zap.NewNop(),
te: traceExporter,
me: metricExporter,
expByDataType: map[configmodels.DataType]component.Exporter{
configmodels.TracesDataType: traceExporter,
configmodels.MetricsDataType: metricExporter,
configmodels.LogsDataType: logsExporter,
},
}
assert.False(t, traceExporter.ExporterStarted)
assert.False(t, metricExporter.ExporterStarted)
assert.False(t, logsExporter.ExporterStarted)

assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost()))

assert.True(t, traceExporter.ExporterStarted)
assert.True(t, metricExporter.ExporterStarted)
assert.True(t, logsExporter.ExporterStarted)
}

func TestExportersBuilder_StopAll(t *testing.T) {
exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{}
traceExporter := &componenttest.ExampleExporterConsumer{}
metricExporter := &componenttest.ExampleExporterConsumer{}
logsExporter := &componenttest.ExampleExporterConsumer{}
exporters[expCfg] = &builtExporter{
logger: zap.NewNop(),
te: traceExporter,
me: metricExporter,
expByDataType: map[configmodels.DataType]component.Exporter{
configmodels.TracesDataType: traceExporter,
configmodels.MetricsDataType: metricExporter,
configmodels.LogsDataType: logsExporter,
},
}
assert.False(t, traceExporter.ExporterShutdown)
assert.False(t, metricExporter.ExporterShutdown)
assert.False(t, logsExporter.ExporterShutdown)
assert.NoError(t, exporters.ShutdownAll(context.Background()))

assert.True(t, traceExporter.ExporterShutdown)
assert.True(t, metricExporter.ExporterShutdown)
assert.True(t, logsExporter.ExporterShutdown)
}

func TestExportersBuilder_ErrorOnNilExporter(t *testing.T) {
Expand Down
25 changes: 4 additions & 21 deletions service/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,9 @@ func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*
func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
return builtExporters[0].te
}

var exporters []consumer.TraceConsumer
for _, builtExp := range builtExporters {
exporters = append(exporters, builtExp.te)
exporters = append(exporters, builtExp.getTraceExporter())
}

// Create a junction point that fans out to all exporters.
Expand All @@ -249,33 +244,21 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st
func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
return builtExporters[0].me
}

var exporters []consumer.MetricsConsumer
for _, builtExp := range builtExporters {
exporters = append(exporters, builtExp.me)
exporters = append(exporters, builtExp.getMetricExporter())
}

// Create a junction point that fans out to all exporters.
return processor.NewMetricsFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer(
exporterNames []string,
) consumer.LogsConsumer {
func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
return builtExporters[0].le
}

exporters := make([]consumer.LogsConsumer, len(builtExporters))
for i, builtExp := range builtExporters {
exporters[i] = builtExp.le
exporters[i] = builtExp.getLogExporter()
}

// Create a junction point that fans out to all exporters.
Expand Down
4 changes: 2 additions & 2 deletions service/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestPipelinesBuilder_BuildVarious(t *testing.T) {
// First check that there are no logs in the exporters yet.
var exporterConsumers []*componenttest.ExampleExporterConsumer
for _, exporter := range exporters {
consumer := exporter.le.(*componenttest.ExampleExporterConsumer)
consumer := exporter.getLogExporter().(*componenttest.ExampleExporterConsumer)
exporterConsumers = append(exporterConsumers, consumer)
require.Equal(t, len(consumer.Logs), 0)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
// First check that there are no traces in the exporters yet.
var exporterConsumers []*componenttest.ExampleExporterConsumer
for _, exporter := range exporters {
consumer := exporter.te.(*componenttest.ExampleExporterConsumer)
consumer := exporter.getTraceExporter().(*componenttest.ExampleExporterConsumer)
exporterConsumers = append(exporterConsumers, consumer)
require.Equal(t, len(consumer.Traces), 0)
}
Expand Down
10 changes: 5 additions & 5 deletions service/builder/receivers_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func testReceivers(

// First check that there are no traces in the exporters yet.
for _, exporter := range exporters {
consumer := exporter.te.(*componenttest.ExampleExporterConsumer)
consumer := exporter.getTraceExporter().(*componenttest.ExampleExporterConsumer)
require.Equal(t, len(consumer.Traces), 0)
require.Equal(t, len(consumer.Metrics), 0)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func testReceivers(
spanDuplicationCount = 1
}

traceConsumer := exporter.te.(*componenttest.ExampleExporterConsumer)
traceConsumer := exporter.getTraceExporter().(*componenttest.ExampleExporterConsumer)
require.Equal(t, spanDuplicationCount, len(traceConsumer.Traces))

for i := 0; i < spanDuplicationCount; i++ {
Expand All @@ -168,7 +168,7 @@ func testReceivers(

// Validate metrics.
if test.hasMetrics {
metricsConsumer := exporter.me.(*componenttest.ExampleExporterConsumer)
metricsConsumer := exporter.getMetricExporter().(*componenttest.ExampleExporterConsumer)
require.Equal(t, 1, len(metricsConsumer.Metrics))
assert.EqualValues(t, metrics, metricsConsumer.Metrics[0])
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestReceiversBuilder_BuildCustom(t *testing.T) {

// First check that there are no traces in the exporters yet.
for _, exporter := range exporters {
consumer := exporter.le.(*componenttest.ExampleExporterConsumer)
consumer := exporter.getLogExporter().(*componenttest.ExampleExporterConsumer)
require.Equal(t, len(consumer.Logs), 0)
}

Expand All @@ -249,7 +249,7 @@ func TestReceiversBuilder_BuildCustom(t *testing.T) {
exporter := allExporters[cfg.Exporters[name]]

// Validate exported data.
consumer := exporter.le.(*componenttest.ExampleExporterConsumer)
consumer := exporter.getLogExporter().(*componenttest.ExampleExporterConsumer)
require.Equal(t, 1, len(consumer.Logs))
assert.EqualValues(t, log, consumer.Logs[0])
}
Expand Down

0 comments on commit d495cf4

Please sign in to comment.