diff --git a/exp/api/remote/remote_api.go b/exp/api/remote/remote_api.go index 433855cd2..cf912249a 100644 --- a/exp/api/remote/remote_api.go +++ b/exp/api/remote/remote_api.go @@ -48,6 +48,10 @@ type API struct { // APIOption represents a remote API option. type APIOption func(o *apiOpts) error +// RetryCallback is called each time Write() retries a request. +// err is the error that caused the retry. +type RetryCallback func(err error) + // TODO(bwplotka): Add "too old sample" handling one day. type apiOpts struct { logger *slog.Logger @@ -56,6 +60,7 @@ type apiOpts struct { compression Compression path string retryOnRateLimit bool + retryCallback RetryCallback } var defaultAPIOpts = &apiOpts{ @@ -111,6 +116,15 @@ func WithAPIBackoff(backoff backoff.Config) APIOption { } } +// WithAPIRetryCallback sets a callback to be invoked on each retry attempt. +// This is useful for tracking retry metrics and debugging retry behavior. +func WithAPIRetryCallback(callback RetryCallback) APIOption { + return func(o *apiOpts) error { + o.retryCallback = callback + return nil + } +} + type nopSlogHandler struct{} func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false } @@ -257,20 +271,23 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ W var retryableErr retryableError if !errors.As(err, &retryableErr) { - // TODO(bwplotka): More context in the error e.g. about retries. return accumulatedStats, err } if !b.Ongoing() { - // TODO(bwplotka): More context in the error e.g. about retries. return accumulatedStats, err } backoffDelay := b.NextDelay() + retryableErr.RetryAfter() + + // Invoke retry callback if provided (after NextDelay which increments the retry counter). + if r.opts.retryCallback != nil { + r.opts.retryCallback(retryableErr.error) + } + r.opts.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay) select { case <-ctx.Done(): - // TODO(bwplotka): More context in the error e.g. about retries. return WriteResponseStats{}, ctx.Err() case <-time.After(backoffDelay): // Retry. diff --git a/exp/api/remote/remote_api_test.go b/exp/api/remote/remote_api_test.go index 9f982a4e3..bf06f613e 100644 --- a/exp/api/remote/remote_api_test.go +++ b/exp/api/remote/remote_api_test.go @@ -208,4 +208,89 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) { t.Fatalf("expected error to contain 'storage error', got %v", err) } }) + + t.Run("retry callback invoked on retries", func(t *testing.T) { + tLogger := slog.Default() + mockCode := http.StatusInternalServerError + mStore := &mockStorage{ + mockErr: errors.New("storage error"), + mockCode: &mockCode, + } + srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger))) + t.Cleanup(srv.Close) + + var retryCount int + var retryErrors []error + + client, err := NewAPI(srv.URL, + WithAPIHTTPClient(srv.Client()), + WithAPILogger(tLogger), + WithAPIPath("api/v1/write"), + WithAPIBackoff(backoff.Config{ + Min: 1 * time.Millisecond, + Max: 1 * time.Millisecond, + MaxRetries: 3, + }), + WithAPIRetryCallback(func(err error) { + retryCount++ + retryErrors = append(retryErrors, err) + }), + ) + if err != nil { + t.Fatal(err) + } + + req := testV2() + _, err = client.Write(context.Background(), WriteV2MessageType, req) + if err == nil { + t.Fatal("expected error, got nil") + } + + // Verify callback was invoked for each retry. + expectedRetries := 3 + if retryCount != expectedRetries { + t.Fatalf("expected %d retry callback invocations, got %d", expectedRetries, retryCount) + } + + // Verify errors were passed correctly. + for i, retryErr := range retryErrors { + if retryErr == nil { + t.Fatalf("expected non-nil error for retry %d", i) + } + if !strings.Contains(retryErr.Error(), "storage error") { + t.Fatalf("expected error to contain 'storage error', got %v", retryErr) + } + } + }) + + t.Run("retry callback not invoked on success", func(t *testing.T) { + tLogger := slog.Default() + mStore := &mockStorage{} + srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger))) + t.Cleanup(srv.Close) + + callbackInvoked := false + client, err := NewAPI(srv.URL, + WithAPIHTTPClient(srv.Client()), + WithAPILogger(tLogger), + WithAPIPath("api/v1/write"), + WithAPIRetryCallback(func(err error) { + callbackInvoked = true + }), + ) + if err != nil { + t.Fatal(err) + } + + req := testV2() + _, err = client.Write(context.Background(), WriteV2MessageType, req) + if err != nil { + t.Fatal(err) + } + + // Verify callback was not invoked for successful request. + if callbackInvoked { + t.Fatal("retry callback should not be invoked on successful request") + } + }) }