Skip to content

Commit

Permalink
Make component interfaces uniform (#488)
Browse files Browse the repository at this point in the history
This change fixes inconsistencies in component interfaces. Motivation:

- Uniformness results in reduction of code that currently has to
  deal with differences.
- Processor.Start is missing and is important for allowing processors
  to communicate with the Host.

What's changed:

- Introduced Component interface.
- Unified Host interface.
- Added a Start function to processors (via Component interface).
- Start/Shutdown is now called for Processors from service start/shutdown.
- Receivers, Exporters, Processors, Extensions now embed Component interface.
- Replaced StartTraceReception/StartMetricsReception by single Start function for receivers.
- Replaced StopTraceReception/StopMetricsReception by single Shutdown function for receivers.

Note: before merging this we need to announce the change in Gitter since it
breaks existing implementations in contrib (although the fix is easy).

Resolves #477
Resolves #262
  • Loading branch information
tigrannajaryan committed Jan 10, 2020
1 parent 256ec41 commit 91728bc
Show file tree
Hide file tree
Showing 54 changed files with 499 additions and 332 deletions.
43 changes: 43 additions & 0 deletions component/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import "context"

// Component is either a receiver, exporter, processor or extension.
type Component interface {
// Start tells the component to start. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
// If this is an exporter component it may prepare for exporting
// by connecting to the endpoint.
Start(host Host) error

// Shutdown is invoked during service shutdown.
Shutdown() error
}

// Host represents the entity that is hosting a Component. It is used to allow communication
// between the Component and its host (normally the service.Application is the host).
type Host interface {
// ReportFatalError is used to report to the host that the extension
// encountered a fatal error (i.e.: an error that the instance can't recover
// from) after its start function had already returned.
ReportFatalError(err error)

// Context returns a context provided by the host to be used on the component
// operations.
Context() context.Context
}
15 changes: 15 additions & 0 deletions component/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component
8 changes: 3 additions & 5 deletions receiver/receivertest/mock_host.go → component/mock_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,17 @@

// Package receivertest define types and functions used to help test packages
// implementing the receiver package interfaces.
package receivertest
package component

import (
"context"

"github.com/open-telemetry/opentelemetry-collector/receiver"
)

// MockHost mocks a receiver.ReceiverHost for test purposes.
type MockHost struct {
}

var _ receiver.Host = (*MockHost)(nil)
var _ Host = (*MockHost)(nil)

// Context returns a context provided by the host to be used on the receiver
// operations.
Expand All @@ -43,6 +41,6 @@ func (mh *MockHost) ReportFatalError(err error) {

// NewMockHost returns a new instance of MockHost with proper defaults for most
// tests.
func NewMockHost() receiver.Host {
func NewMockHost() Host {
return &MockHost{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

// Package receivertest define types and functions used to help test packages
// implementing the receiver package interfaces.
package receivertest
package component

import (
"errors"
Expand Down
67 changes: 43 additions & 24 deletions config/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
Expand Down Expand Up @@ -82,7 +83,20 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
if cfg.(*ExampleReceiver).FailTraceCreation {
return nil, configerror.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := exampleReceivers[cfg]
if !ok {
receiver = &ExampleReceiverProducer{}
// Remember the receiver in the map
exampleReceivers[cfg] = receiver
}
receiver.TraceConsumer = nextConsumer

return receiver, nil
}

// CreateMetricsReceiver creates a metrics receiver based on this config.
Expand All @@ -94,33 +108,44 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
if cfg.(*ExampleReceiver).FailMetricsCreation {
return nil, configerror.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := exampleReceivers[cfg]
if !ok {
receiver = &ExampleReceiverProducer{}
// Remember the receiver in the map
exampleReceivers[cfg] = receiver
}
receiver.MetricsConsumer = nextConsumer

return receiver, nil
}

// ExampleReceiverProducer allows producing traces and metrics for testing purposes.
type ExampleReceiverProducer struct {
TraceConsumer consumer.TraceConsumer
TraceStarted bool
TraceStopped bool
Started bool
Stopped bool
MetricsConsumer consumer.MetricsConsumer
MetricsStarted bool
MetricsStopped bool
}

// TraceSource returns the name of the trace data source.
func (erp *ExampleReceiverProducer) TraceSource() string {
return ""
}

// StartTraceReception tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) StartTraceReception(host receiver.Host) error {
erp.TraceStarted = true
// Start tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) Start(host component.Host) error {
erp.Started = true
return nil
}

// StopTraceReception tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) StopTraceReception() error {
erp.TraceStopped = true
// Shutdown tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) Shutdown() error {
erp.Stopped = true
return nil
}

Expand All @@ -129,17 +154,11 @@ func (erp *ExampleReceiverProducer) MetricsSource() string {
return ""
}

// StartMetricsReception tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) StartMetricsReception(host receiver.Host) error {
erp.MetricsStarted = true
return nil
}

// StopMetricsReception tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) StopMetricsReception() error {
erp.MetricsStopped = true
return nil
}
// This is the map of already created example receivers for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
var exampleReceivers = map[configmodels.Receiver]*ExampleReceiverProducer{}

// MultiProtoReceiver is for testing purposes. We are defining an example multi protocol
// config and factory for "multireceiver" receiver type.
Expand Down Expand Up @@ -293,7 +312,7 @@ type ExampleExporterConsumer struct {
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (exp *ExampleExporterConsumer) Start(host exporter.Host) error {
func (exp *ExampleExporterConsumer) Start(host component.Host) error {
exp.ExporterStarted = true
return nil
}
Expand Down
62 changes: 62 additions & 0 deletions config/example_factories_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
)

func TestExampleExporterConsumer(t *testing.T) {
exp := &ExampleExporterConsumer{}
host := component.NewMockHost()
assert.Equal(t, false, exp.ExporterStarted)
err := exp.Start(host)
assert.NoError(t, err)
assert.Equal(t, true, exp.ExporterStarted)

assert.Equal(t, 0, len(exp.Traces))
err = exp.ConsumeTraceData(context.Background(), consumerdata.TraceData{})
assert.NoError(t, err)
assert.Equal(t, 1, len(exp.Traces))

assert.Equal(t, 0, len(exp.Metrics))
err = exp.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{})
assert.NoError(t, err)
assert.Equal(t, 1, len(exp.Metrics))

assert.Equal(t, false, exp.ExporterShutdown)
err = exp.Shutdown()
assert.NoError(t, err)
assert.Equal(t, true, exp.ExporterShutdown)
}

func TestExampleReceiverProducer(t *testing.T) {
rcv := &ExampleReceiverProducer{}
host := component.NewMockHost()
assert.Equal(t, false, rcv.Started)
err := rcv.Start(host)
assert.NoError(t, err)
assert.Equal(t, true, rcv.Started)

err = rcv.Shutdown()
assert.NoError(t, err)
assert.Equal(t, true, rcv.Started)
}
19 changes: 2 additions & 17 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,13 @@
package exporter

import (
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
)

// Host represents the entity where the exporter is being hosted. It is used to
// allow communication between the exporter and its host.
type Host interface {
// ReportFatalError is used to report to the host that the exporter encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
ReportFatalError(err error)
}

// Exporter defines functions that trace and metric exporters must implement.
type Exporter interface {
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
Start(host Host) error

// Shutdown is invoked during service shutdown.
Shutdown() error
component.Component
}

// TraceExporter composes TraceConsumer with some additional exporter-specific functions.
Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
Expand All @@ -37,7 +38,7 @@ type metricsExporter struct {

var _ (exporter.MetricsExporter) = (*metricsExporter)(nil)

func (me *metricsExporter) Start(host exporter.Host) error {
func (me *metricsExporter) Start(host component.Host) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
Expand All @@ -37,7 +38,7 @@ type traceExporter struct {

var _ (exporter.TraceExporter) = (*traceExporter)(nil)

func (te *traceExporter) Start(host exporter.Host) error {
func (te *traceExporter) Start(host component.Host) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/exportertest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exportertest
import (
"context"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
)
Expand All @@ -32,7 +33,7 @@ type nopExporter struct {
var _ exporter.TraceExporter = (*nopExporter)(nil)
var _ exporter.MetricsExporter = (*nopExporter)(nil)

func (ne *nopExporter) Start(host exporter.Host) error {
func (ne *nopExporter) Start(host component.Host) error {
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions exporter/exportertest/sink_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
)
Expand All @@ -33,7 +34,7 @@ var _ exporter.TraceExporter = (*SinkTraceExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (ste *SinkTraceExporter) Start(host exporter.Host) error {
func (ste *SinkTraceExporter) Start(host component.Host) error {
return nil
}

Expand Down Expand Up @@ -76,7 +77,7 @@ var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (sme *SinkMetricsExporter) Start(host exporter.Host) error {
func (sme *SinkMetricsExporter) Start(host component.Host) error {
return nil
}

Expand Down

0 comments on commit 91728bc

Please sign in to comment.