Skip to content

Commit

Permalink
Tidy up consumer/consumererror package. (#2768)
Browse files Browse the repository at this point in the history
* Tidy up `consumer/consumererror` package.

* Updated docblocks for grammar and consistency
* Added `IsPartial()` predicate to match `IsPermanent()`
* Ensured tests for `PartialError` test the public interface

Remove `PartialError` and replace with individual signal error types

Refactor consumererror signal extraction to simplify exporterhelper request interface

* Rename consumererror signal error types to align with rest of codebase

* Rename `onPartialError` to `onError` in `exporterhelper.request` interface

* Provide conversion methods to consumererror signal error types.

This moves the accessors for signal data to methods on the individual error types
and provides As<Signal>() package functions that behave as targeted versions of
the errors.As() function.

* Avoid unnecessary allocation, fixup docs
  • Loading branch information
Aneurysm9 authored Mar 23, 2021
1 parent c81a01b commit a65a5a6
Show file tree
Hide file tree
Showing 33 changed files with 216 additions and 144 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased

## 🛑 Breaking changes 🛑

- Refactored `consumererror` package (#2768)
- Eliminated `PartialError` type in favor of signal-specific types
- Renamed `CombineErrors` to `Combine`

## v0.23.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
4 changes: 2 additions & 2 deletions config/configcheck/configcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func ValidateConfigFromFactories(factories component.Factories) error {
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// ValidateConfig enforces that given configuration object is following the patterns
Expand Down Expand Up @@ -109,7 +109,7 @@ func validateConfigDataType(t reflect.Type) error {
// reflect.UnsafePointer.
}

if err := consumererror.CombineErrors(errs); err != nil {
if err := consumererror.Combine(errs); err != nil {
return fmt.Errorf(
"type %q from package %q has invalid config settings: %v",
t.Name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ import (
"strings"
)

// CombineErrors converts a list of errors into one error.
func CombineErrors(errs []error) error {
// Combine converts a list of errors into one error.
//
// If any of the errors in errs are Permanent then the returned
// error will also be Permanent.
//
// Any signal data associated with an error from this package
// will be discarded.
func Combine(errs []error) error {
numErrors := len(errs)
if numErrors == 0 {
// No errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
)

func TestCombineErrors(t *testing.T) {
func TestCombine(t *testing.T) {
testCases := []struct {
errors []error
expected string
Expand Down Expand Up @@ -53,15 +53,15 @@ func TestCombineErrors(t *testing.T) {
}

for _, tc := range testCases {
got := CombineErrors(tc.errors)
got := Combine(tc.errors)
if (got == nil) != tc.expectNil {
t.Errorf("CombineErrors(%v) == nil? Got: %t. Want: %t", tc.errors, got == nil, tc.expectNil)
t.Errorf("Combine(%v) == nil? Got: %t. Want: %t", tc.errors, got == nil, tc.expectNil)
}
if got != nil && tc.expected != got.Error() {
t.Errorf("CombineErrors(%v) = %q. Want: %q", tc.errors, got, tc.expected)
t.Errorf("Combine(%v) = %q. Want: %q", tc.errors, got, tc.expected)
}
if tc.expectedPermanent && !IsPermanent(got) {
t.Errorf("CombineErrors(%v) = %q. Want: consumererror.permanent", tc.errors, got)
t.Errorf("Combine(%v) = %q. Want: consumererror.permanent", tc.errors, got)
}
}
}
68 changes: 0 additions & 68 deletions consumer/consumererror/partialerror.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ type permanent struct {
err error
}

// permanentError exists to test errors for "IsPermanent"
var permanentError = &permanent{}

// Permanent wraps an error to indicate that it is a permanent error, i.e.: an
// error that will be always returned if its source receives the same inputs.
func Permanent(err error) error {
Expand All @@ -42,8 +39,8 @@ func (p permanent) Error() string {
// is used to indicate that a given error will always be returned in the case
// that its sources receives the same input.
func IsPermanent(err error) bool {
if err != nil {
return errors.As(err, permanentError)
if err == nil {
return false
}
return false
return errors.As(err, &permanent{})
}
File renamed without changes.
108 changes: 108 additions & 0 deletions consumer/consumererror/signalerrors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumererror

import (
"errors"

"go.opentelemetry.io/collector/consumer/pdata"
)

// Traces is an error that may carry associated Trace data for a subset of received data
// that faiiled to be processed or sent.
type Traces struct {
error
failed pdata.Traces
}

// NewTraces creates a Traces that can encapsulate received data that failed to be processed or sent.
func NewTraces(err error, failed pdata.Traces) error {
return Traces{
error: err,
failed: failed,
}
}

// AsTraces finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsTraces(err error, target *Traces) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetTraces returns failed traces from the associated error.
func (err Traces) GetTraces() pdata.Traces {
return err.failed
}

// Logs is an error that may carry associated Log data for a subset of received data
// that faiiled to be processed or sent.
type Logs struct {
error
failed pdata.Logs
}

// NewLogs creates a Logs that can encapsulate received data that failed to be processed or sent.
func NewLogs(err error, failed pdata.Logs) error {
return Logs{
error: err,
failed: failed,
}
}

// AsLogs finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsLogs(err error, target *Logs) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetLogs returns failed logs from the associated error.
func (err Logs) GetLogs() pdata.Logs {
return err.failed
}

// Metrics is an error that may carry associated Metrics data for a subset of received data
// that faiiled to be processed or sent.
type Metrics struct {
error
failed pdata.Metrics
}

// NewMetrics creates a Metrics that can encapsulate received data that failed to be processed or sent.
func NewMetrics(err error, failed pdata.Metrics) error {
return Metrics{
error: err,
failed: failed,
}
}

// AsMetrics finds the first error in err's chain that can be assigned to target. If such an error is found
// it is assigned to target and true is returned, otherwise false is returned.
func AsMetrics(err error, target *Metrics) bool {
if err == nil {
return false
}
return errors.As(err, target)
}

// GetMetrics returns failed metrics from the associated error.
func (err Metrics) GetMetrics() pdata.Metrics {
return err.failed
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,38 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestPartialError(t *testing.T) {
func TestTraces(t *testing.T) {
td := testdata.GenerateTraceDataOneSpan()
err := fmt.Errorf("some error")
partialErr := PartialTracesError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failed)
traceErr := NewTraces(err, td)
assert.Equal(t, err.Error(), traceErr.Error())
var target Traces
assert.False(t, AsTraces(nil, &target))
assert.False(t, AsTraces(err, &target))
assert.True(t, AsTraces(traceErr, &target))
assert.Equal(t, td, target.GetTraces())
}

func TestPartialErrorLogs(t *testing.T) {
func TestLogs(t *testing.T) {
td := testdata.GenerateLogDataOneLog()
err := fmt.Errorf("some error")
partialErr := PartialLogsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedLogs)
logsErr := NewLogs(err, td)
assert.Equal(t, err.Error(), logsErr.Error())
var target Logs
assert.False(t, AsLogs(nil, &target))
assert.False(t, AsLogs(err, &target))
assert.True(t, AsLogs(logsErr, &target))
assert.Equal(t, td, target.GetLogs())
}

func TestPartialErrorMetrics(t *testing.T) {
func TestMetrics(t *testing.T) {
td := testdata.GenerateMetricsOneMetric()
err := fmt.Errorf("some error")
partialErr := PartialMetricsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedMetrics)
metricErr := NewMetrics(err, td)
assert.Equal(t, err.Error(), metricErr.Error())
var target Metrics
assert.False(t, AsMetrics(nil, &target))
assert.False(t, AsMetrics(err, &target))
assert.True(t, AsMetrics(metricErr, &target))
assert.Equal(t, td, target.GetMetrics())
}
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/cloningconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.M
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewTracesCloning wraps multiple traces consumers in a single one and clones the data
Expand Down Expand Up @@ -93,7 +93,7 @@ func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Tra
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewLogsCloning wraps multiple trace consumers in a single one and clones the data
Expand Down Expand Up @@ -130,5 +130,5 @@ func (lfc logsCloningConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) e
}
}

return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics)
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewTraces wraps multiple trace consumers in a single one.
Expand All @@ -72,7 +72,7 @@ func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) err
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}

// NewLogs wraps multiple log consumers in a single one.
Expand All @@ -96,5 +96,5 @@ func (lfc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
errs = append(errs, err)
}
}
return consumererror.CombineErrors(errs)
return consumererror.Combine(errs)
}
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)

// ComponentSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand All @@ -46,8 +45,9 @@ type request interface {
// setContext updates the Context of the requests.
setContext(context.Context)
export(ctx context.Context) error
// Returns a new request that contains the items left to be sent.
onPartialError(consumererror.PartialError) request
// Returns a new request may contain the items left to be sent if some items failed to process and can be retried.
// Otherwise, it should return the original request.
onError(error) request
// Returns the count of spans/metric points or log records.
count() int
}
Expand Down
Loading

0 comments on commit a65a5a6

Please sign in to comment.