Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions exp/api/remote/remote_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type API struct {
// APIOption represents a remote API option.
type APIOption func(o *apiOpts) error

// RetryCallback is called each time Write() retries a request.
// err is the error that caused the retry.
type RetryCallback func(err error)

// TODO(bwplotka): Add "too old sample" handling one day.
type apiOpts struct {
logger *slog.Logger
Expand All @@ -56,6 +60,7 @@ type apiOpts struct {
compression Compression
path string
retryOnRateLimit bool
retryCallback RetryCallback
}

var defaultAPIOpts = &apiOpts{
Expand Down Expand Up @@ -111,6 +116,15 @@ func WithAPIBackoff(backoff backoff.Config) APIOption {
}
}

// WithAPIRetryCallback sets a callback to be invoked on each retry attempt.
// This is useful for tracking retry metrics and debugging retry behavior.
func WithAPIRetryCallback(callback RetryCallback) APIOption {
return func(o *apiOpts) error {
o.retryCallback = callback
return nil
}
}

type nopSlogHandler struct{}

func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false }
Expand Down Expand Up @@ -257,20 +271,23 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ W

var retryableErr retryableError
if !errors.As(err, &retryableErr) {
// TODO(bwplotka): More context in the error e.g. about retries.
return accumulatedStats, err
}

if !b.Ongoing() {
// TODO(bwplotka): More context in the error e.g. about retries.
return accumulatedStats, err
}

backoffDelay := b.NextDelay() + retryableErr.RetryAfter()

// Invoke retry callback if provided (after NextDelay which increments the retry counter).
if r.opts.retryCallback != nil {
r.opts.retryCallback(retryableErr.error)
}

r.opts.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay)
select {
case <-ctx.Done():
// TODO(bwplotka): More context in the error e.g. about retries.
return WriteResponseStats{}, ctx.Err()
case <-time.After(backoffDelay):
// Retry.
Expand Down
85 changes: 85 additions & 0 deletions exp/api/remote/remote_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,89 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
t.Fatalf("expected error to contain 'storage error', got %v", err)
}
})

t.Run("retry callback invoked on retries", func(t *testing.T) {
tLogger := slog.Default()
mockCode := http.StatusInternalServerError
mStore := &mockStorage{
mockErr: errors.New("storage error"),
mockCode: &mockCode,
}
srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger)))
t.Cleanup(srv.Close)

var retryCount int
var retryErrors []error

client, err := NewAPI(srv.URL,
WithAPIHTTPClient(srv.Client()),
WithAPILogger(tLogger),
WithAPIPath("api/v1/write"),
WithAPIBackoff(backoff.Config{
Min: 1 * time.Millisecond,
Max: 1 * time.Millisecond,
MaxRetries: 3,
}),
WithAPIRetryCallback(func(err error) {
retryCount++
retryErrors = append(retryErrors, err)
}),
)
if err != nil {
t.Fatal(err)
}

req := testV2()
_, err = client.Write(context.Background(), WriteV2MessageType, req)
if err == nil {
t.Fatal("expected error, got nil")
}

// Verify callback was invoked for each retry.
expectedRetries := 3
if retryCount != expectedRetries {
t.Fatalf("expected %d retry callback invocations, got %d", expectedRetries, retryCount)
}

// Verify errors were passed correctly.
for i, retryErr := range retryErrors {
if retryErr == nil {
t.Fatalf("expected non-nil error for retry %d", i)
}
if !strings.Contains(retryErr.Error(), "storage error") {
t.Fatalf("expected error to contain 'storage error', got %v", retryErr)
}
}
})

t.Run("retry callback not invoked on success", func(t *testing.T) {
tLogger := slog.Default()
mStore := &mockStorage{}
srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger)))
t.Cleanup(srv.Close)

callbackInvoked := false
client, err := NewAPI(srv.URL,
WithAPIHTTPClient(srv.Client()),
WithAPILogger(tLogger),
WithAPIPath("api/v1/write"),
WithAPIRetryCallback(func(err error) {
callbackInvoked = true
}),
)
if err != nil {
t.Fatal(err)
}

req := testV2()
_, err = client.Write(context.Background(), WriteV2MessageType, req)
if err != nil {
t.Fatal(err)
}

// Verify callback was not invoked for successful request.
if callbackInvoked {
t.Fatal("retry callback should not be invoked on successful request")
}
})
}
Loading