Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
n-oden committed Oct 31, 2023
2 parents 4110835 + ce7b40a commit 59af688
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 104 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660)
- Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622)
- `go.opentelemetry.io/otel/bridge/opencensus.NewMetricProducer` now supports exemplars from OpenCensus. (#4585)
- Add support for `WithExplicitBucketBoundaries` in `go.opentelemetry.io/otel/sdk/metric`. (#4605)

### Deprecated

Expand Down Expand Up @@ -49,6 +50,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` does no longer depend on `go.opentelemetry.io/otel/exporters/otlp/otlpmetric`. (#4660)
- Retry for `502 Bad Gateway` and `504 Gateway Timeout` HTTP statuses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4670)
- Retry for `502 Bad Gateway` and `504 Gateway Timeout` HTTP statuses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4670)
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4669)
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#4669)
- Retry temporary HTTP request failures in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4679)
- Retry temporary HTTP request failures in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4679)

### Fixed

Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,36 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally, handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns if the status is RetryInfo
// and the duration to wait for if an explicit throttle time is included.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}
44 changes: 33 additions & 11 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -53,7 +55,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
Expand All @@ -63,7 +66,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -76,7 +80,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
Expand All @@ -91,13 +96,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -112,7 +120,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -129,6 +137,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

type clientShim struct {
*client
}
Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -147,6 +148,10 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou

request.reset(iCtx)
resp, err := c.httpClient.Do(request.Request)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Temporary() {
return newResponseError(http.Header{})
}
if err != nil {
return err
}
Expand Down
20 changes: 8 additions & 12 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func TestClient(t *testing.T) {
t.Run("Integration", otest.RunClientTests(factory))
}

func TestNewWithInvalidEndpoint(t *testing.T) {
ctx := context.Background()
exp, err := New(ctx, WithEndpoint("host:invalid-port"))
assert.Error(t, err)
assert.Nil(t, exp)
}

func TestConfig(t *testing.T) {
factoryFunc := func(ePt string, rCh <-chan otest.ExportResult, o ...Option) (metric.Exporter, *otest.HTTPCollector) {
coll, err := otest.NewHTTPCollector(ePt, rCh)
Expand Down Expand Up @@ -113,7 +120,7 @@ func TestConfig(t *testing.T) {
t.Cleanup(func() { close(rCh) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
assert.ErrorAs(t, err, new(retryableError))
})

t.Run("WithCompressionGZip", func(t *testing.T) {
Expand Down Expand Up @@ -174,17 +181,6 @@ func TestConfig(t *testing.T) {
assert.Len(t, coll.Collect().Dump(), 1)
})

t.Run("WithURLPath", func(t *testing.T) {
path := "/prefix/v2/metrics"
ePt := fmt.Sprintf("http://localhost:0%s", path)
exp, coll := factoryFunc(ePt, nil, WithURLPath(path))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
assert.Len(t, coll.Collect().Dump(), 1)
})

t.Run("WithTLSClientConfig", func(t *testing.T) {
ePt := "https://localhost:0"
tlsCfg := &tls.Config{InsecureSkipVerify: true}
Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlptrace/otlptracegrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,38 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns of the status is RetryInfo
// and the its duration to wait for if an explicit throttle time.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}

// MarshalLog is the marshaling function used by the logging system to represent this Client.
Expand Down
52 changes: 37 additions & 15 deletions exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,45 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "no retry info"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "single retry info").WithDetails(
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "error info").WithDetails(
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "error and retry info").WithDetails(
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
Expand All @@ -70,11 +74,12 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "double retry info").WithDetails(
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
Expand All @@ -85,13 +90,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -106,7 +114,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -123,6 +131,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

func TestUnstartedStop(t *testing.T) {
client := NewClient()
assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped)
Expand Down

0 comments on commit 59af688

Please sign in to comment.