Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement flags to control retry delays #83

Merged
merged 1 commit into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 26 additions & 38 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
Expand All @@ -32,14 +31,15 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/ShowMax/go-fqdn"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus-community/pushprox/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus-community/pushprox/util"
)

var (
Expand All @@ -49,6 +49,9 @@ var (
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()

retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
)

var (
Expand Down Expand Up @@ -76,6 +79,15 @@ func init() {
prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter)
}

func newBackOffFromFlags() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = *retryInitialWait
b.Multiplier = 1.5
b.MaxInterval = *retryMaxWait
b.MaxElapsedTime = time.Duration(0)
return b
}

// Coordinator for scrape requests and responses
type Coordinator struct {
logger log.Logger
Expand Down Expand Up @@ -168,7 +180,7 @@ func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, cli
return nil
}

func loop(c Coordinator, client *http.Client) error {
func (c *Coordinator) doPoll(client *http.Client) error {
base, err := url.Parse(*proxyURL)
if err != nil {
level.Error(c.logger).Log("msg", "Error parsing url:", "err", err)
Expand Down Expand Up @@ -201,35 +213,18 @@ func loop(c Coordinator, client *http.Client) error {
return nil
}

// decorrelated Jitter increases the maximum jitter based on the last random value.
type decorrelatedJitter struct {
duration time.Duration // sleep time
min time.Duration // min sleep time
cap time.Duration // max sleep time
}

func newJitter() decorrelatedJitter {
rand.Seed(time.Now().UnixNano())
return decorrelatedJitter{
min: 50 * time.Millisecond,
cap: 5 * time.Second,
func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) {
op := func() error {
return c.doPoll(client)
}
}

func (d *decorrelatedJitter) calc() time.Duration {
change := rand.Float64() * float64(d.duration*time.Duration(3)-d.min)
d.duration = d.min + time.Duration(change)
if d.duration > d.cap {
d.duration = d.cap
}
if d.duration < d.min {
d.duration = d.min
for {
if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) {
pollErrorCounter.Inc()
}); err != nil {
level.Error(c.logger).Log("err", err)
}
}
return d.duration
}

func (d *decorrelatedJitter) sleep() {
time.Sleep(d.calc())
}

func main() {
Expand Down Expand Up @@ -299,14 +294,7 @@ func main() {
TLSClientConfig: tlsConfig,
}

jitter := newJitter()
client := &http.Client{Transport: transport}
for {
err := loop(coordinator, client)
if err != nil {
pollErrorCounter.Inc()
jitter.sleep()
continue
}
}

coordinator.loop(newBackOffFromFlags(), client)
}
12 changes: 1 addition & 11 deletions cmd/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ import (
"github.com/pkg/errors"
)

func TestJitter(t *testing.T) {
jitter := newJitter()
for i := 0; i < 100000; i++ {
duration := jitter.calc()
if !(jitter.min <= duration || duration <= jitter.cap) {
t.Fatal("invalid jitter value: ", duration)
}
}
}

type TestLogger struct{}

func (tl *TestLogger) Log(vars ...interface{}) error {
Expand Down Expand Up @@ -76,7 +66,7 @@ func TestHandleErr(t *testing.T) {
func TestLoop(t *testing.T) {
ts, c := prepareTest()
defer ts.Close()
if err := loop(c, ts.Client()); err != nil {
if err := c.doPoll(ts.Client()); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/ShowMax/go-fqdn v0.0.0-20180501083314-6f60894d629f
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-kit/kit v0.10.0
github.com/google/uuid v1.1.1
github.com/pkg/errors v0.9.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down