diff --git a/cmd/clair/main.go b/cmd/clair/main.go index 124519d350..d26aaf3ec5 100644 --- a/cmd/clair/main.go +++ b/cmd/clair/main.go @@ -18,6 +18,7 @@ import ( yaml "gopkg.in/yaml.v3" "github.com/quay/clair/v4/config" + "github.com/quay/clair/v4/health" "github.com/quay/clair/v4/httptransport" "github.com/quay/clair/v4/initialize" "github.com/quay/clair/v4/introspection" @@ -108,6 +109,7 @@ func main() { return fmt.Errorf("http transport configuration failed: %w", err) } down.Add(h.Server) + health.Ready() if err := h.ListenAndServe(); err != http.ErrServerClosed { return fmt.Errorf("http transport failed to launch: %w", err) } diff --git a/cmd/clairctl/export.go b/cmd/clairctl/export.go index 438ca2a7a3..b51e082a8f 100644 --- a/cmd/clairctl/export.go +++ b/cmd/clairctl/export.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "github.com/quay/claircore/libvuln/driver" @@ -11,6 +12,8 @@ import ( "github.com/quay/claircore/libvuln/updates" _ "github.com/quay/claircore/updater/defaults" "github.com/urfave/cli/v2" + + "github.com/quay/clair/v4/internal/httputil" ) // ExportCmd is the "export-updaters" subcommand. @@ -62,7 +65,8 @@ func exportAction(c *cli.Context) error { cfgs[name] = node.Decode } - cl, _, err := cfg.Client(nil, commonClaim) + tr := http.DefaultTransport.(*http.Transport).Clone() + cl, _, err := cfg.Client(httputil.RateLimiter(tr), commonClaim) if err != nil { return err } diff --git a/config/httpclient.go b/config/httpclient.go index 550bc57d79..bfdfe556b3 100644 --- a/config/httpclient.go +++ b/config/httpclient.go @@ -2,8 +2,10 @@ package config import ( "net/http" + "net/http/cookiejar" "time" + "golang.org/x/net/publicsuffix" "gopkg.in/square/go-jose.v2" "gopkg.in/square/go-jose.v2/jwt" ) @@ -13,7 +15,7 @@ import ( // // It returns an *http.Client and a boolean indicating whether the client is // configured for authentication, or an error that occurred during construction. -func (cfg *Config) Client(next *http.Transport, cl jwt.Claims) (c *http.Client, authed bool, err error) { +func (cfg *Config) Client(next http.RoundTripper, cl jwt.Claims) (c *http.Client, authed bool, err error) { if next == nil { next = http.DefaultTransport.(*http.Transport).Clone() } @@ -29,11 +31,20 @@ func (cfg *Config) Client(next *http.Transport, cl jwt.Claims) (c *http.Client, sk.Key = cfg.Auth.PSK.Key default: } + jar, err := cookiejar.New(&cookiejar.Options{ + PublicSuffixList: publicsuffix.List, + }) + if err != nil { + return nil, false, err + } rt := &transport{ next: next, base: cl, } - c = &http.Client{Transport: rt} + c = &http.Client{ + Jar: jar, + Transport: rt, + } // Both of the JWT-based methods set the signing key. if sk.Key != nil { diff --git a/go.mod b/go.mod index 14a7b61eb9..8d0edb1435 100644 --- a/go.mod +++ b/go.mod @@ -32,8 +32,9 @@ require ( go.opentelemetry.io/otel/exporters/trace/jaeger v0.16.0 go.opentelemetry.io/otel/sdk v0.16.0 golang.org/x/mod v0.4.2 // indirect + golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a - golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e // indirect + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/tools v0.1.0 // indirect gopkg.in/square/go-jose.v2 v2.4.1 gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86 diff --git a/go.sum b/go.sum index 585cb629ea..0032d5c8a5 100644 --- a/go.sum +++ b/go.sum @@ -823,6 +823,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 h1:b0LrWgu8+q7z4J+0Y3Umo5q1dL7NXBkKBWkaVkAq17E= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -894,11 +896,13 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e h1:XNp2Flc/1eWQGk5BLzqTAN7fQIwIbfyVTuVxXxZh73M= -golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005 h1:pDMpM2zh2MT0kHy037cKlSby2nEhD50SYqwQk76Nm40= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/health/readinesshandler.go b/health/readinesshandler.go index 5664ab5058..804c7a5c3b 100644 --- a/health/readinesshandler.go +++ b/health/readinesshandler.go @@ -3,43 +3,39 @@ package health import ( "net/http" "sync/atomic" - - je "github.com/quay/claircore/pkg/jsonerr" ) var ready *uint32 = new(uint32) -// Ready instructs the ReadinessHandler to begin serving 200OK status +// Ready instructs the ReadinessHandler to begin serving 200 OK status. func Ready() { atomic.StoreUint32(ready, uint32(1)) } -// NotReady instructs the ReadinessHandler to begin serving 503ServiceUnavailable status -func UnReady() { +// Unready instructs the ReadinessHandler to begin serving 503 +// Service Unavailable. +func Unready() { atomic.StoreUint32(ready, uint32(0)) } -// ReadinessHandler will return a 200OK or 503ServiceUnavailable status dependent -// on whether the exported Ready or NotReady methods have been called. +// ReadinessHandler will return a 200 OK or 503 "Service Unavailable" status +// depending on whether the Ready or Unready functions have been called. // -// The Ready() method must be called to begin returning 200OK -func ReadinessHandler() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { +// The Ready() method must be called to begin returning 200 OK. +func ReadinessHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h := w.Header() + h.Set("X-Content-Type-Options", "nosniff") + h.Set("Content-Type", "text/plain; charset=utf-8") + h.Set("Content-Length", "0") + h.Set("Cache-Control", "no-store") if r.Method != http.MethodGet { - resp := &je.Response{ - Code: "method-not-allowed", - Message: "endpoint only allows GET", - } - je.Error(w, resp, http.StatusMethodNotAllowed) + w.WriteHeader(http.StatusMethodNotAllowed) return } - ready := atomic.LoadUint32(ready) - if ready == 1 { - w.WriteHeader(http.StatusOK) - } else { + if atomic.LoadUint32(ready) != 1 { w.WriteHeader(http.StatusServiceUnavailable) } - return - } + }) } diff --git a/health/readinesshandler_test.go b/health/readinesshandler_test.go index 67b0758a89..c1bf6491a8 100644 --- a/health/readinesshandler_test.go +++ b/health/readinesshandler_test.go @@ -36,7 +36,7 @@ func TestReadinessHandler(t *testing.T) { } // signal to handler that process is unready. should return StatusServiceUnavailable - health.UnReady() + health.Unready() resp, err = client.Do(req) if err != nil { t.Fatalf("failed to do request: %v", err) diff --git a/initialize/services.go b/initialize/services.go index 483c142385..173ef90334 100644 --- a/initialize/services.go +++ b/initialize/services.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "net/http/cookiejar" "time" "github.com/quay/claircore/libindex" @@ -13,6 +14,7 @@ import ( "github.com/quay/zlog" "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" + "golang.org/x/net/publicsuffix" "gopkg.in/square/go-jose.v2/jwt" clairerror "github.com/quay/clair/v4/clair-error" @@ -20,6 +22,7 @@ import ( "github.com/quay/clair/v4/httptransport" "github.com/quay/clair/v4/httptransport/client" "github.com/quay/clair/v4/indexer" + "github.com/quay/clair/v4/internal/httputil" "github.com/quay/clair/v4/matcher" notifier "github.com/quay/clair/v4/notifier/service" ) @@ -163,7 +166,8 @@ func remoteIndexer(ctx context.Context, cfg *config.Config, addr string) (indexe } func remoteClient(ctx context.Context, cfg *config.Config, claim jwt.Claims, addr string) (*client.HTTP, error) { - c, auth, err := cfg.Client(nil, claim) + tr := http.DefaultTransport.(*http.Transport).Clone() + c, auth, err := cfg.Client(tr, claim) switch { case err != nil: return nil, err @@ -182,6 +186,17 @@ func localMatcher(ctx context.Context, cfg *config.Config) (matcher.Service, err } } + tr := http.DefaultTransport.(*http.Transport).Clone() + jar, err := cookiejar.New(&cookiejar.Options{ + PublicSuffixList: publicsuffix.List, + }) + if err != nil { + return nil, err + } + cl := &http.Client{ + Jar: jar, + Transport: httputil.RateLimiter(tr), + } updaterConfigs := make(map[string]driver.ConfigUnmarshaler) for name, node := range cfg.Updaters.Config { updaterConfigs[name] = node.Decode @@ -200,6 +215,7 @@ func localMatcher(ctx context.Context, cfg *config.Config) (matcher.Service, err UpdateRetention: cfg.Matcher.UpdateRetention, MatcherNames: cfg.Matchers.Names, MatcherConfigs: matcherConfigs, + Client: cl, }) if err != nil { return nil, mkErr(err) @@ -227,7 +243,8 @@ func localNotifier(ctx context.Context, cfg *config.Config, i indexer.Service, m } } - c, _, err := cfg.Client(nil, notifierClaim) + tr := http.DefaultTransport.(*http.Transport).Clone() + c, _, err := cfg.Client(tr, notifierClaim) if err != nil { return nil, mkErr(err) } diff --git a/internal/httputil/ratelimiter.go b/internal/httputil/ratelimiter.go new file mode 100644 index 0000000000..28a639b69a --- /dev/null +++ b/internal/httputil/ratelimiter.go @@ -0,0 +1,67 @@ +package httputil + +import ( + "net/http" + "sync" + + "golang.org/x/time/rate" +) + +// RateLimiter wraps the provided RoundTripper with a limiter allowing 10 +// requests/second/host. +// +// It responds to HTTP 429 responses by automatically decreasing the rate. +func RateLimiter(next http.RoundTripper) http.RoundTripper { + return &ratelimiter{ + rt: next, + lm: sync.Map{}, + } +} + +// Ratelimiter implements the limiting by using a concurrent map and Limiter +// structs. +type ratelimiter struct { + rt http.RoundTripper + lm sync.Map +} + +const rateCap = 10 + +// RoundTrip implements http.RoundTripper. +func (r *ratelimiter) RoundTrip(req *http.Request) (*http.Response, error) { + key := req.URL.Host + li, ok := r.lm.Load(key) + if !ok { + // Limiter allows "rateCap" per sec, one at a time. + l := rate.NewLimiter(rate.Limit(rateCap), 1) + li, _ = r.lm.LoadOrStore(key, l) + } + l := li.(*rate.Limiter) + if err := l.Wait(req.Context()); err != nil { + return nil, err + } + res, err := r.rt.RoundTrip(req) + // This seems to be the contract that http.Transport implements. + if err != nil { + return nil, err + } + switch res.StatusCode { + case http.StatusOK: + // Try increasing on OK. + if lim := l.Limit(); lim < rateCap { + l.SetLimit(lim + 1) + } + case http.StatusTooManyRequests: + // Try to allow some requests, eventually. + l.SetLimit(detune(l.Limit())) + } + return res, nil +} + +// Detune reduces the rate. +func detune(in rate.Limit) rate.Limit { + if in <= 1 { + return in / 2 + } + return in - 1 +} diff --git a/internal/httputil/ratelimiter_test.go b/internal/httputil/ratelimiter_test.go new file mode 100644 index 0000000000..02f089e915 --- /dev/null +++ b/internal/httputil/ratelimiter_test.go @@ -0,0 +1,56 @@ +package httputil + +import ( + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" +) + +func TestRate(t *testing.T) { + const nReq = 20 + + var wg sync.WaitGroup + wg.Add(nReq) + begin := make(chan struct{}) + var last struct { + sync.Mutex + t time.Time + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + last.Lock() + last.t = time.Now() + last.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + cl := srv.Client() + cl.Transport = RateLimiter(cl.Transport) + + for i := 0; i < nReq; i++ { + go func() { + defer wg.Done() + <-begin + res, err := cl.Get(srv.URL) + if err != nil { + t.Error(err) + return + } + res.Body.Close() + }() + } + + first := time.Now() + close(begin) + wg.Wait() + + t.Logf("begin: %v", first) + t.Logf("end: %v", last.t) + rate := nReq / last.t.Sub(first).Seconds() + t.Logf("rate: %v", rate) + + if rate < (rateCap-1) || rate > (rateCap+1) { + t.Error("rate outside acceptable bounds") + } +} diff --git a/introspection/server.go b/introspection/server.go index 4c2511fd89..83e8dcf458 100644 --- a/introspection/server.go +++ b/introspection/server.go @@ -17,6 +17,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "github.com/quay/clair/v4/config" + "github.com/quay/clair/v4/health" ) const ( @@ -27,6 +28,7 @@ const ( Jaeger = "jaeger" DefaultJaegerEndpoint = "localhost:6831" HealthEndpoint = "/healthz" + ReadyEndpoint = "/readyz" DefaultIntrospectionAddr = ":8089" ) @@ -122,6 +124,9 @@ func New(ctx context.Context, conf config.Config, health func() bool) (*Server, if err != nil { return nil, fmt.Errorf("error configuring diagnostics: %v", err) } + if err := i.withReady(ctx); err != nil { + return nil, fmt.Errorf("error configuring ready: %v", err) + } // attach Introspection to server, this works because we embed http.ServeMux i.Server.Handler = i @@ -149,6 +154,11 @@ func (i *Server) withDiagnostics(_ context.Context) error { return nil } +func (i *Server) withReady(_ context.Context) error { + i.ServeMux.Handle(ReadyEndpoint, health.ReadinessHandler()) + return nil +} + // withStdOut configures the stdout exporter for distributed tracing func (i *Server) withStdOut(_ context.Context, traceOpts []sdktrace.TracerProviderOption) error { exporter, err := stdout.NewExporter()