Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ratelimiter #1211

Merged
merged 3 commits into from May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/clair/main.go
Expand Up @@ -18,6 +18,7 @@ import (
yaml "gopkg.in/yaml.v3"

"github.com/quay/clair/v4/config"
"github.com/quay/clair/v4/health"
"github.com/quay/clair/v4/httptransport"
"github.com/quay/clair/v4/initialize"
"github.com/quay/clair/v4/introspection"
Expand Down Expand Up @@ -108,6 +109,7 @@ func main() {
return fmt.Errorf("http transport configuration failed: %w", err)
}
down.Add(h.Server)
health.Ready()
if err := h.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("http transport failed to launch: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/clairctl/export.go
Expand Up @@ -4,13 +4,16 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"

"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/libvuln/jsonblob"
"github.com/quay/claircore/libvuln/updates"
_ "github.com/quay/claircore/updater/defaults"
"github.com/urfave/cli/v2"

"github.com/quay/clair/v4/internal/httputil"
)

// ExportCmd is the "export-updaters" subcommand.
Expand Down Expand Up @@ -62,7 +65,8 @@ func exportAction(c *cli.Context) error {
cfgs[name] = node.Decode
}

cl, _, err := cfg.Client(nil, commonClaim)
tr := http.DefaultTransport.(*http.Transport).Clone()
cl, _, err := cfg.Client(httputil.RateLimiter(tr), commonClaim)
if err != nil {
return err
}
Expand Down
15 changes: 13 additions & 2 deletions config/httpclient.go
Expand Up @@ -2,8 +2,10 @@ package config

import (
"net/http"
"net/http/cookiejar"
"time"

"golang.org/x/net/publicsuffix"
"gopkg.in/square/go-jose.v2"
"gopkg.in/square/go-jose.v2/jwt"
)
Expand All @@ -13,7 +15,7 @@ import (
//
// It returns an *http.Client and a boolean indicating whether the client is
// configured for authentication, or an error that occurred during construction.
func (cfg *Config) Client(next *http.Transport, cl jwt.Claims) (c *http.Client, authed bool, err error) {
func (cfg *Config) Client(next http.RoundTripper, cl jwt.Claims) (c *http.Client, authed bool, err error) {
if next == nil {
next = http.DefaultTransport.(*http.Transport).Clone()
}
Expand All @@ -29,11 +31,20 @@ func (cfg *Config) Client(next *http.Transport, cl jwt.Claims) (c *http.Client,
sk.Key = cfg.Auth.PSK.Key
default:
}
jar, err := cookiejar.New(&cookiejar.Options{
PublicSuffixList: publicsuffix.List,
})
if err != nil {
return nil, false, err
}
rt := &transport{
next: next,
base: cl,
}
c = &http.Client{Transport: rt}
c = &http.Client{
Jar: jar,
Transport: rt,
}

// Both of the JWT-based methods set the signing key.
if sk.Key != nil {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -32,8 +32,9 @@ require (
go.opentelemetry.io/otel/exporters/trace/jaeger v0.16.0
go.opentelemetry.io/otel/sdk v0.16.0
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.0 // indirect
gopkg.in/square/go-jose.v2 v2.4.1
gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Expand Up @@ -823,6 +823,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 h1:b0LrWgu8+q7z4J+0Y3Umo5q1dL7NXBkKBWkaVkAq17E=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -894,11 +896,13 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e h1:XNp2Flc/1eWQGk5BLzqTAN7fQIwIbfyVTuVxXxZh73M=
golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005 h1:pDMpM2zh2MT0kHy037cKlSby2nEhD50SYqwQk76Nm40=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
38 changes: 17 additions & 21 deletions health/readinesshandler.go
Expand Up @@ -3,43 +3,39 @@ package health
import (
"net/http"
"sync/atomic"

je "github.com/quay/claircore/pkg/jsonerr"
)

var ready *uint32 = new(uint32)

// Ready instructs the ReadinessHandler to begin serving 200OK status
// Ready instructs the ReadinessHandler to begin serving 200 OK status.
func Ready() {
atomic.StoreUint32(ready, uint32(1))
}

// NotReady instructs the ReadinessHandler to begin serving 503ServiceUnavailable status
func UnReady() {
// Unready instructs the ReadinessHandler to begin serving 503
// Service Unavailable.
func Unready() {
atomic.StoreUint32(ready, uint32(0))
}

// ReadinessHandler will return a 200OK or 503ServiceUnavailable status dependent
// on whether the exported Ready or NotReady methods have been called.
// ReadinessHandler will return a 200 OK or 503 "Service Unavailable" status
// depending on whether the Ready or Unready functions have been called.
//
// The Ready() method must be called to begin returning 200OK
func ReadinessHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// The Ready() method must be called to begin returning 200 OK.
func ReadinessHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
h.Set("X-Content-Type-Options", "nosniff")
h.Set("Content-Type", "text/plain; charset=utf-8")
h.Set("Content-Length", "0")
h.Set("Cache-Control", "no-store")
if r.Method != http.MethodGet {
resp := &je.Response{
Code: "method-not-allowed",
Message: "endpoint only allows GET",
}
je.Error(w, resp, http.StatusMethodNotAllowed)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

ready := atomic.LoadUint32(ready)
if ready == 1 {
w.WriteHeader(http.StatusOK)
} else {
if atomic.LoadUint32(ready) != 1 {
w.WriteHeader(http.StatusServiceUnavailable)
}
return
}
})
}
2 changes: 1 addition & 1 deletion health/readinesshandler_test.go
Expand Up @@ -36,7 +36,7 @@ func TestReadinessHandler(t *testing.T) {
}

// signal to handler that process is unready. should return StatusServiceUnavailable
health.UnReady()
health.Unready()
resp, err = client.Do(req)
if err != nil {
t.Fatalf("failed to do request: %v", err)
Expand Down
21 changes: 19 additions & 2 deletions initialize/services.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"net/http/cookiejar"
"time"

"github.com/quay/claircore/libindex"
Expand All @@ -13,13 +14,15 @@ import (
"github.com/quay/zlog"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/label"
"golang.org/x/net/publicsuffix"
"gopkg.in/square/go-jose.v2/jwt"

clairerror "github.com/quay/clair/v4/clair-error"
"github.com/quay/clair/v4/config"
"github.com/quay/clair/v4/httptransport"
"github.com/quay/clair/v4/httptransport/client"
"github.com/quay/clair/v4/indexer"
"github.com/quay/clair/v4/internal/httputil"
"github.com/quay/clair/v4/matcher"
notifier "github.com/quay/clair/v4/notifier/service"
)
Expand Down Expand Up @@ -163,7 +166,8 @@ func remoteIndexer(ctx context.Context, cfg *config.Config, addr string) (indexe
}

func remoteClient(ctx context.Context, cfg *config.Config, claim jwt.Claims, addr string) (*client.HTTP, error) {
c, auth, err := cfg.Client(nil, claim)
tr := http.DefaultTransport.(*http.Transport).Clone()
c, auth, err := cfg.Client(tr, claim)
switch {
case err != nil:
return nil, err
Expand All @@ -182,6 +186,17 @@ func localMatcher(ctx context.Context, cfg *config.Config) (matcher.Service, err
}
}

tr := http.DefaultTransport.(*http.Transport).Clone()
jar, err := cookiejar.New(&cookiejar.Options{
PublicSuffixList: publicsuffix.List,
})
if err != nil {
return nil, err
}
cl := &http.Client{
Jar: jar,
Transport: httputil.RateLimiter(tr),
}
updaterConfigs := make(map[string]driver.ConfigUnmarshaler)
for name, node := range cfg.Updaters.Config {
updaterConfigs[name] = node.Decode
Expand All @@ -200,6 +215,7 @@ func localMatcher(ctx context.Context, cfg *config.Config) (matcher.Service, err
UpdateRetention: cfg.Matcher.UpdateRetention,
MatcherNames: cfg.Matchers.Names,
MatcherConfigs: matcherConfigs,
Client: cl,
})
if err != nil {
return nil, mkErr(err)
Expand Down Expand Up @@ -227,7 +243,8 @@ func localNotifier(ctx context.Context, cfg *config.Config, i indexer.Service, m
}
}

c, _, err := cfg.Client(nil, notifierClaim)
tr := http.DefaultTransport.(*http.Transport).Clone()
c, _, err := cfg.Client(tr, notifierClaim)
if err != nil {
return nil, mkErr(err)
}
Expand Down
67 changes: 67 additions & 0 deletions internal/httputil/ratelimiter.go
@@ -0,0 +1,67 @@
package httputil

import (
"net/http"
"sync"

"golang.org/x/time/rate"
)

// RateLimiter wraps the provided RoundTripper with a limiter allowing 10
// requests/second/host.
//
// It responds to HTTP 429 responses by automatically decreasing the rate.
func RateLimiter(next http.RoundTripper) http.RoundTripper {
return &ratelimiter{
rt: next,
lm: sync.Map{},
}
}

// Ratelimiter implements the limiting by using a concurrent map and Limiter
// structs.
type ratelimiter struct {
rt http.RoundTripper
lm sync.Map
}

const rateCap = 10

// RoundTrip implements http.RoundTripper.
func (r *ratelimiter) RoundTrip(req *http.Request) (*http.Response, error) {
key := req.URL.Host
li, ok := r.lm.Load(key)
if !ok {
// Limiter allows "rateCap" per sec, one at a time.
l := rate.NewLimiter(rate.Limit(rateCap), 1)
li, _ = r.lm.LoadOrStore(key, l)
}
l := li.(*rate.Limiter)
if err := l.Wait(req.Context()); err != nil {
return nil, err
}
res, err := r.rt.RoundTrip(req)
// This seems to be the contract that http.Transport implements.
if err != nil {
return nil, err
}
switch res.StatusCode {
case http.StatusOK:
// Try increasing on OK.
if lim := l.Limit(); lim < rateCap {
l.SetLimit(lim + 1)
}
case http.StatusTooManyRequests:
// Try to allow some requests, eventually.
l.SetLimit(detune(l.Limit()))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be reading this wrong, but when/if the downstream recovers, would the limit be "stuck" at a lower than optimal rate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

I'm not sure what to do to re-ramp the limit. Using SetLimitAt would result in the limit getting similarly stuck, by my thinking. Perhaps a second Limiter with token representing an increase to the limit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not so familiar with the request patterns or how a certain node can become overwhelmed, but would it make sense to have a second case (or an else case) that increases the Limit (retune) when status == <good status> and then taking the Min(newLimit, lim) to ensure you don't go over the max? (In general, detune should reduce faster than retune increases it).

In the long run it seems like using a client to mitigate against server errors would lead to a different loadbalancing strategy to the RoundTrip, but that seems like a discussion for a different time.

return res, nil
}

// Detune reduces the rate.
func detune(in rate.Limit) rate.Limit {
if in <= 1 {
return in / 2
}
return in - 1
}
56 changes: 56 additions & 0 deletions internal/httputil/ratelimiter_test.go
@@ -0,0 +1,56 @@
package httputil

import (
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
)

func TestRate(t *testing.T) {
const nReq = 20

var wg sync.WaitGroup
wg.Add(nReq)
begin := make(chan struct{})
var last struct {
sync.Mutex
t time.Time
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
last.Lock()
last.t = time.Now()
last.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
cl := srv.Client()
cl.Transport = RateLimiter(cl.Transport)

for i := 0; i < nReq; i++ {
go func() {
defer wg.Done()
<-begin
res, err := cl.Get(srv.URL)
if err != nil {
t.Error(err)
return
}
res.Body.Close()
}()
}

first := time.Now()
close(begin)
wg.Wait()

t.Logf("begin: %v", first)
t.Logf("end: %v", last.t)
rate := nReq / last.t.Sub(first).Seconds()
t.Logf("rate: %v", rate)

if rate < (rateCap-1) || rate > (rateCap+1) {
t.Error("rate outside acceptable bounds")
}
}