Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions exp/api/remote/remote_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ import (
"github.com/prometheus/client_golang/exp/internal/github.com/efficientgo/core/backoff"
)

// BackoffConfig configures exponential backoff with jitter for retry operations.
type BackoffConfig struct {
Min time.Duration `yaml:"min_period"` // Start backoff at this level
Max time.Duration `yaml:"max_period"` // Increase exponentially to this level
MaxRetries int `yaml:"max_retries"` // Give up after this many; zero means infinite retries
}

// API is a client for Prometheus Remote Protocols.
// NOTE(bwplotka): Only https://prometheus.io/docs/specs/remote_write_spec_2_0/ is currently implemented,
// read protocols to be implemented if there will be a demand.
Expand All @@ -56,14 +63,14 @@ type RetryCallback func(err error)
type apiOpts struct {
logger *slog.Logger
client *http.Client
backoff backoff.Config
backoffConfig BackoffConfig
compression Compression
path string
retryOnRateLimit bool
}

var defaultAPIOpts = &apiOpts{
backoff: backoff.Config{
backoffConfig: BackoffConfig{
Min: 1 * time.Second,
Max: 10 * time.Second,
MaxRetries: 10,
Expand Down Expand Up @@ -107,10 +114,11 @@ func WithAPINoRetryOnRateLimit() APIOption {
}
}

// WithAPIBackoff returns APIOption that allows overriding backoff configuration.
func WithAPIBackoff(backoff backoff.Config) APIOption {
// WithAPIBackoff returns APIOption that allows configuring backoff.
// By default, exponential backoff with jitter is used (see defaultAPIOpts).
func WithAPIBackoff(cfg BackoffConfig) APIOption {
return func(o *apiOpts) error {
o.backoff = backoff
o.backoffConfig = cfg
return nil
}
}
Expand Down Expand Up @@ -259,7 +267,11 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any, opts
// across the various attempts.
accumulatedStats := WriteResponseStats{}

b := backoff.New(ctx, r.opts.backoff)
b := backoff.New(ctx, backoff.Config{
Min: r.opts.backoffConfig.Min,
Max: r.opts.backoffConfig.Max,
MaxRetries: r.opts.backoffConfig.MaxRetries,
})
for {
rs, err := r.attemptWrite(ctx, r.opts.compression, msgType, payload, b.NumRetries())
accumulatedStats.Add(rs)
Expand Down
5 changes: 2 additions & 3 deletions exp/api/remote/remote_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"google.golang.org/protobuf/testing/protocmp"

writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2"
"github.com/prometheus/client_golang/exp/internal/github.com/efficientgo/core/backoff"
)

func TestRetryAfterDuration(t *testing.T) {
Expand Down Expand Up @@ -189,7 +188,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
WithAPIHTTPClient(srv.Client()),
WithAPILogger(tLogger),
WithAPIPath("api/v1/write"),
WithAPIBackoff(backoff.Config{
WithAPIBackoff(BackoffConfig{
Min: 1 * time.Second,
Max: 1 * time.Second,
MaxRetries: 2,
Expand Down Expand Up @@ -226,7 +225,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
WithAPIHTTPClient(srv.Client()),
WithAPILogger(tLogger),
WithAPIPath("api/v1/write"),
WithAPIBackoff(backoff.Config{
WithAPIBackoff(BackoffConfig{
Min: 1 * time.Millisecond,
Max: 1 * time.Millisecond,
MaxRetries: 3,
Expand Down
Loading