Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Sep 11, 2023
1 parent 239c635 commit 97aa3cb
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 45 deletions.
2 changes: 0 additions & 2 deletions component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (nh *nopHost) ReportComponentStatus(_ component.Status, _ ...component.StatusEventOption) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion component/componenttest/nop_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestNewNopHost(t *testing.T) {
require.IsType(t, &nopHost{}, nh)

nh.ReportFatalError(errors.New("TestError"))

assert.Nil(t, nh.GetExporters()) // nolint: staticcheck
assert.Nil(t, nh.GetExtensions())
assert.Nil(t, nh.GetFactory(component.KindReceiver, "test"))
Expand Down
2 changes: 1 addition & 1 deletion component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Host interface {
//
// ReportFatalError should be called by the component anytime after Component.Start() ends and
// before Component.Shutdown() begins.
// Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event component.StatusFatalError)
// Deprecated: [x.x.x] Use ReportComponentStatus instead (with an event component.StatusFatalError)
ReportFatalError(err error)

// GetFactory of the specified kind. Returns the factory for a component type.
Expand Down
1 change: 1 addition & 0 deletions component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,5 @@ type StatusWatcher interface {
ComponentStatusChanged(source *InstanceID, event *StatusEvent)
}

// StatusFunc is the expected type of ReportComponentStatus for compoment.TelemetrySettings
type StatusFunc func(Status, ...StatusEventOption) error
2 changes: 2 additions & 0 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ type TelemetrySettingsBase[T any] struct {
ReportComponentStatus T
}

// TelemetrySettings and servicetelemetry.Settings differ in the method signature for
// ReportComponentStatus
type TelemetrySettings TelemetrySettingsBase[StatusFunc]
26 changes: 20 additions & 6 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,20 @@ func TestComponentStatusWatcher(t *testing.T) {
assert.NoError(t, err)

// Use a processor factory that creates "unhealthy" processor: one that
// always reports StatusError after successful Start.
// always reports StatusRecoverableError after successful Start.
unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory()
factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory

// Keep track of all status changes in a map.
changedComponents := map[*component.InstanceID]component.Status{}
changedComponents := map[*component.InstanceID][]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) {
if event.Status() != component.StatusRecoverableError {
if source.ID.Type() != unhealthyProcessorFactory.Type() {
return
}
mux.Lock()
defer mux.Unlock()
changedComponents[source] = event.Status()
changedComponents[source] = append(changedComponents[source], event.Status())
}

// Add a "statuswatcher" extension that will receive notifications when processor
Expand All @@ -194,6 +194,13 @@ func TestComponentStatusWatcher(t *testing.T) {
// Start the newly created collector.
wg := startCollector(context.Background(), t, col)

// An unhealthy processor will successfully start, then report a recoverable error.
expectedStatuses := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}

// The "unhealthy" processors will now begin to asynchronously report StatusError.
// We expect to see these reports.
assert.Eventually(t, func() bool {
Expand All @@ -203,8 +210,8 @@ func TestComponentStatusWatcher(t *testing.T) {
for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must be in StatusError
assert.EqualValues(t, component.StatusRecoverableError, v)
// And all must have the expected statuses
assert.Equal(t, expectedStatuses, v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
Expand All @@ -216,6 +223,13 @@ func TestComponentStatusWatcher(t *testing.T) {

col.Shutdown()
wg.Wait()

// Check for additional statuses after Shutdown.
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
for _, v := range changedComponents {
assert.Equal(t, expectedStatuses, v)
}

assert.Equal(t, StateClosed, col.GetState())
}

Expand Down
2 changes: 1 addition & 1 deletion processor/processortest/unhealthy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type unhealthyProcessor struct {
telemetry component.TelemetrySettings
}

func (p unhealthyProcessor) Start(_ context.Context, host component.Host) error {
func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error {
go func() {
_ = p.telemetry.ReportComponentStatus(component.StatusRecoverableError)
}()
Expand Down
6 changes: 1 addition & 5 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,13 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
extMap: make(map[component.ID]extension.Extension),
}
for _, extID := range cfg {

instanceID := &component.InstanceID{
ID: extID,
Kind: component.KindExtension,
}

telSet := set.Telemetry.ToComponentTelemetrySettings(instanceID)

extSet := extension.CreateSettings{
ID: extID,
TelemetrySettings: telSet,
TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID),
BuildInfo: set.BuildInfo,
}
extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID)
Expand Down
2 changes: 1 addition & 1 deletion service/internal/components/host_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host {
func (hw *hostWrapper) ReportFatalError(err error) {
// The logger from the built component already identifies the component.
hw.Logger.Error("Component fatal error", zap.Error(err))
hw.Host.ReportFatalError(err) // nolint:staticcheck
hw.Host.ReportFatalError(err)
}

// RegisterZPages is used by zpages extension to register handles from service.
Expand Down
3 changes: 2 additions & 1 deletion service/internal/components/host_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"errors"
"testing"

"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
)

func Test_newHostWrapper(_ *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions service/internal/servicetelemetry/nop_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
)

func TestNewNopSettings(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion service/internal/servicetelemetry/settings.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package servicetelemetry // import "go.opentelemetry.io/collector/internal/servicetelemetry"
package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/status"
)

// Settings mirrors component.TelemetrySettings except for the method signature of
// ReportComponentStatus. The service level Settings is not bound a specific component, and
// therefore takes a component.InstanceID as an argument.
type Settings component.TelemetrySettingsBase[status.ServiceStatusFunc]

// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from
// this service level Settings object.
func (s Settings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings {
return component.TelemetrySettings{
Logger: s.Logger,
Expand Down
34 changes: 34 additions & 0 deletions service/internal/servicetelemetry/settings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package servicetelemetry

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestSettings(t *testing.T) {
set := Settings{
Logger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, component.Status, ...component.StatusEventOption) error {
return nil
},
}
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.StatusOK))

compSet := set.ToComponentTelemetrySettings(&component.InstanceID{})
require.NoError(t, compSet.ReportComponentStatus(component.StatusOK))
}
20 changes: 15 additions & 5 deletions service/internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// onTransitionFunc receives a component.StatusEvent on a successful state transition
type onTransitionFunc func(*component.StatusEvent)

// errInvalidStateTransition is returned for invalid state transitions
var errInvalidStateTransition = errors.New("invalid state transition")

// fsm is a finite state machine that models transitions for component status
Expand All @@ -23,10 +24,10 @@ type fsm struct {
onTransition onTransitionFunc
}

// Transition will attempt to execute a state transition. If successful, it calls the onTransitionFunc
// with a StatusEvent representing the new state. Returns an error if the arguments result in an
// invalid status, or if the state transition is not valid.
func (m *fsm) Transition(status component.Status, options ...component.StatusEventOption) error {
// transition will attempt to execute a state transition. If it's successful, it calls the
// onTransitionFunc with a StatusEvent representing the new state. Returns an error if the arguments
// result in an invalid status, or if the state transition is not valid.
func (m *fsm) transition(status component.Status, options ...component.StatusEventOption) error {
if _, ok := m.transitions[m.current.Status()][status]; !ok {
return fmt.Errorf(
"cannot transition from %s to %s: %w",
Expand Down Expand Up @@ -93,9 +94,13 @@ func newFSM(onTransition onTransitionFunc) *fsm {
}
}

// InitFunc can be used to toggle a ready flag to true
type InitFunc func()

// readFunc can be used to check the value of a ready flag
type readyFunc func() bool

// initAndReadyFuncs returns a pair of functions to set and check a boolean ready flag
func initAndReadyFuncs() (InitFunc, readyFunc) {
mu := sync.RWMutex{}
isReady := false
Expand All @@ -115,9 +120,13 @@ func initAndReadyFuncs() (InitFunc, readyFunc) {
return init, ready
}

// NotifyStatusFunc is the receiver of status events after successful state transitions
type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent)

// ServiceStatusFunc is the expected type of ReportComponentStatus for servicetelemetry.Settings
type ServiceStatusFunc func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption) error

// errStatusNotReady is returned when trying to report status before service start
var errStatusNotReady = errors.New("report component status is not ready until service start")

// NewServiceStatusFunc returns a function to be used as ReportComponentStatus for
Expand All @@ -126,6 +135,7 @@ var errStatusNotReady = errors.New("report component status is not ready until s
// the a component.InstanceID as a parameter.
func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) {
init, isReady := initAndReadyFuncs()
// mu synchronizes access to the fsmMap and the underlying fsm during a state transition
mu := sync.Mutex{}
fsmMap := make(map[*component.InstanceID]*fsm)
return init,
Expand All @@ -142,7 +152,7 @@ func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, Servic
})
fsmMap[id] = fsm
}
return fsm.Transition(status, opts...)
return fsm.transition(status, opts...)
}

}
Expand Down
6 changes: 3 additions & 3 deletions service/internal/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestStatusFSM(t *testing.T) {

errorCount := 0
for _, status := range tc.reportedStatuses {
if err := fsm.Transition(status); err != nil {
if err := fsm.transition(status); err != nil {
errorCount++
require.ErrorIs(t, err, errInvalidStateTransition)
}
Expand All @@ -146,11 +146,11 @@ func TestStatusFSM(t *testing.T) {

func TestStatusEventError(t *testing.T) {
fsm := newFSM(func(*component.StatusEvent) {})
err := fsm.Transition(component.StatusStarting)
err := fsm.transition(component.StatusStarting)
require.NoError(t, err)

// the combination of StatusOK with an error is invalid
err = fsm.Transition(component.StatusOK, component.WithError(assert.AnError))
err = fsm.transition(component.StatusOK, component.WithError(assert.AnError))

require.Error(t, err)
require.ErrorIs(t, err, component.ErrStatusEventInvalidArgument)
Expand Down
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (srv *Service) Start(ctx context.Context) error {
zap.Int("NumCPU", runtime.NumCPU()),
)

// enable status reporting
srv.statusInit()

if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil {
Expand Down
31 changes: 15 additions & 16 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,27 +404,26 @@ func TestNilCollectorEffectiveConfig(t *testing.T) {
}

func TestServiceFatalError(t *testing.T) {
//TODO: restore this test
// set := newNopSettings()
// set.AsyncErrorChannel = make(chan error)
set := newNopSettings()
set.AsyncErrorChannel = make(chan error)

// srv, err := New(context.Background(), set, newNopConfig())
// require.NoError(t, err)
srv, err := New(context.Background(), set, newNopConfig())
require.NoError(t, err)

// assert.NoError(t, srv.Start(context.Background()))
// t.Cleanup(func() {
// assert.NoError(t, srv.Shutdown(context.Background()))
// })
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})

// go func() {
// ev, _ := component.NewStatusEvent(component.StatusFatalError, component.WithError(assert.AnError))
// srv.host.ReportComponentStatus(&component.InstanceID{}, ev)
// }()
go func() {
ev, _ := component.NewStatusEvent(component.StatusFatalError, component.WithError(assert.AnError))
srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev)
}()

// err = <-srv.host.asyncErrorChannel
err = <-srv.host.asyncErrorChannel

// require.Error(t, err)
// require.ErrorIs(t, err, assert.AnError)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
}

func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
Expand Down

0 comments on commit 97aa3cb

Please sign in to comment.