Skip to content

Commit da2dba9

Browse files
authoredAug 5, 2024
Merge pull request #719 from atlassian/htan/retry-test
add tests and refactor retry sending
2 parents 640077e + 7360256 commit da2dba9

File tree

4 files changed

+106
-24
lines changed

4 files changed

+106
-24
lines changed
 

‎pkg/backends/otlp/backend.go

+28-23
Original file line numberDiff line numberDiff line change
@@ -304,39 +304,44 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
304304
}
305305

306306
func (c *Backend) postMetrics(ctx context.Context, batch group) error {
307-
resourceMetrics := batch.values()
307+
var (
308+
retries int
309+
req *http.Request
310+
resp *http.Response
311+
err error
312+
)
308313

309-
req, err := data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
314+
resourceMetrics := batch.values()
315+
req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
310316
if err != nil {
311317
atomic.AddUint64(&c.batchesDropped, 1)
312318
return err
313319
}
314320

315-
c.requestsBufferSem <- struct{}{}
316-
resp, err := c.client.Do(req)
317-
<-c.requestsBufferSem
318-
// OTLP standard specifies 400 will be returned if the request is non-retryable, so we don't retry on 400
319-
if err != nil && resp.StatusCode != http.StatusBadRequest {
320-
for i := 0; i < c.maxRetries; i++ {
321-
atomic.AddUint64(&c.batchesRetried.Cur, 1)
322-
if resp != nil {
323-
resp.Body.Close()
324-
}
325-
c.requestsBufferSem <- struct{}{}
326-
resp, err = c.client.Do(req)
327-
<-c.requestsBufferSem
321+
for {
322+
var dropped int64
323+
c.requestsBufferSem <- struct{}{}
324+
resp, err = c.client.Do(req)
325+
<-c.requestsBufferSem
326+
if err == nil {
327+
dropped, err = data.ProcessMetricResponse(resp)
328328
if err == nil {
329-
break
330-
} else {
331-
c.logger.WithError(err).WithFields(logrus.Fields{
332-
"endpoint": c.metricsEndpoint,
333-
}).Error("failed while retrying")
329+
return nil
334330
}
331+
if dropped > 0 {
332+
// If partial data points were dropped, it shouldn't retry
333+
atomic.AddUint64(&c.seriesDropped, uint64(dropped))
334+
return err
335+
}
336+
}
337+
338+
if retries >= c.maxRetries {
339+
break
335340
}
341+
342+
retries++
343+
atomic.AddUint64(&c.batchesRetried.Cur, 1)
336344
}
337-
defer resp.Body.Close()
338-
dropped, err := data.ProcessMetricResponse(resp)
339-
atomic.AddUint64(&c.seriesDropped, uint64(dropped))
340345

341346
return err
342347
}

‎pkg/backends/otlp/backend_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,78 @@ func TestBackendSendAsyncMetrics(t *testing.T) {
380380
}
381381
}
382382

383+
func TestRetrySendMetrics(t *testing.T) {
384+
t.Parallel()
385+
for _, tc := range []struct {
386+
name string
387+
numUntilSuccess int
388+
maxRetries int
389+
wantAttempts int
390+
numErrs int
391+
}{
392+
{
393+
name: "should retry sending metrics if it fails for the first time",
394+
numUntilSuccess: 2,
395+
maxRetries: 3,
396+
wantAttempts: 2,
397+
numErrs: 0,
398+
},
399+
{
400+
name: "should give up if it still fails when reach the maximum number of retries",
401+
numUntilSuccess: 5,
402+
maxRetries: 3,
403+
wantAttempts: 4,
404+
numErrs: 1,
405+
},
406+
{
407+
name: "should not retry if it succeeds at the first time",
408+
numUntilSuccess: 1,
409+
maxRetries: 3,
410+
wantAttempts: 1,
411+
numErrs: 0,
412+
},
413+
{
414+
name: "should not retry if maxRetries is 0",
415+
numUntilSuccess: 5,
416+
maxRetries: 0,
417+
wantAttempts: 1,
418+
numErrs: 1,
419+
},
420+
} {
421+
t.Run(tc.name, func(t *testing.T) {
422+
attempts := 0
423+
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
424+
attempts++
425+
if attempts < tc.numUntilSuccess {
426+
http.Error(w, "im dead", http.StatusServiceUnavailable)
427+
return
428+
}
429+
w.WriteHeader(http.StatusOK)
430+
}))
431+
t.Cleanup(s.Close)
432+
433+
v := viper.New()
434+
v.Set("otlp.metrics_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/metrics"))
435+
v.Set("otlp.logs_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/logs"))
436+
v.Set("otlp.max_retries", tc.maxRetries)
437+
438+
logger := fixtures.NewTestLogger(t)
439+
440+
b, err := NewClientFromViper(
441+
v,
442+
logger,
443+
transport.NewTransportPool(logger, v),
444+
)
445+
require.NoError(t, err, "Must not error creating backend")
446+
447+
b.SendMetricsAsync(context.Background(), gostatsd.NewMetricMap(false), func(errs []error) {
448+
assert.Equal(t, tc.numErrs, len(errs))
449+
assert.Equal(t, tc.wantAttempts, attempts, "Must retry sending metrics")
450+
})
451+
})
452+
}
453+
}
454+
383455
func TestSendEvent(t *testing.T) {
384456
t.Parallel()
385457
for _, tc := range []struct {

‎pkg/backends/otlp/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (c *Config) Validate() (errs error) {
9191
if c.MaxRequests <= 0 {
9292
errs = multierr.Append(errs, errors.New("max request must be a positive value"))
9393
}
94-
if c.MaxRetries <= 0 {
94+
if c.MaxRetries < 0 {
9595
errs = multierr.Append(errs, errors.New("max retries must be a positive value"))
9696
}
9797
if c.MetricsPerBatch <= 0 {

‎pkg/backends/otlp/internal/data/metric_response.go

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package data
22

33
import (
4+
"errors"
45
"fmt"
56
"io"
67
"net/http"
@@ -11,6 +12,10 @@ import (
1112
)
1213

1314
func ProcessMetricResponse(resp *http.Response) (dropped int64, errs error) {
15+
if resp == nil {
16+
return 0, errors.New("empty response")
17+
}
18+
1419
buf, err := io.ReadAll(resp.Body)
1520
if err != nil {
1621
return 0, err

0 commit comments

Comments
 (0)
Failed to load comments.