Skip to content

Commit

Permalink
[chore] [exporterhelper] Move shutdown error from queue package (#9554)
Browse files Browse the repository at this point in the history
The error is created by the retry sender and used by the queue sender.
It doesn't belong to queue package
  • Loading branch information
dmitryax committed Feb 14, 2024
1 parent d455bff commit cc88aee
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 6 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

Expand Down Expand Up @@ -127,7 +127,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return queue.NewShutdownErr(err)
return experr.NewShutdownErr(err)
case <-time.After(backoffDelay):
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
package experr // import "go.opentelemetry.io/collector/exporter/internal/experr"

import "errors"

type shutdownErr struct {
err error
Expand All @@ -18,3 +20,8 @@ func (s shutdownErr) Error() string {
func (s shutdownErr) Unwrap() error {
return s.err
}

func IsShutdownErr(err error) bool {
var sdErr shutdownErr
return errors.As(err, &sdErr)
}
24 changes: 24 additions & 0 deletions exporter/internal/experr/err_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package experr

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewShutdownErr(t *testing.T) {
err := NewShutdownErr(errors.New("some error"))
assert.Equal(t, "interrupted due to shutdown: some error", err.Error())
}

func TestIsShutdownErr(t *testing.T) {
err := errors.New("testError")
require.False(t, IsShutdownErr(err))
err = NewShutdownErr(err)
require.True(t, IsShutdownErr(err))
}
3 changes: 2 additions & 1 deletion exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand Down Expand Up @@ -360,7 +361,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
pq.mu.Unlock()
}()

if errors.As(consumeErr, &shutdownErr{}) {
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
Expand Down
5 changes: 3 additions & 2 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -411,7 +412,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
}
assert.Equal(t, 3, ps.Size())
require.True(t, ps.Consume(func(context.Context, tracesRequest) error {
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.Equal(t, 2, ps.Size())

Expand Down Expand Up @@ -523,7 +524,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
// put one more item in
require.NoError(t, ps.Offer(context.Background(), req))
require.Equal(t, 5, ps.Size())
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.NoError(t, ps.Shutdown(context.Background()))

Expand Down

0 comments on commit cc88aee

Please sign in to comment.