Skip to content

Commit

Permalink
move filter to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Jul 24, 2023
1 parent 66ae79b commit 4b2e71f
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 133 deletions.
102 changes: 15 additions & 87 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/server/options"
authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1"
authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
Expand Down Expand Up @@ -93,19 +87,14 @@ type controllerManager struct {
// metricsListener is used to serve prometheus metrics
metricsListener net.Listener

// metricsSecureServing enables secure metrics serving.
// This means metrics will be served via https and with authentication and authorization.
metricsSecureServing bool
// metricsFilter is a func that is added before the metrics handler on the metrics server.
// This can be e.g. used to enforce authentication and authorization on the metrics
// endpoint.
metricsFilter metrics.Filter

// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
metricsExtraHandlers map[string]http.Handler

// metricsAuthenticationClient is the client used to authenticate requests to the metrics endpoint.
metricsAuthenticationClient authenticationv1.AuthenticationV1Interface

// metricsAuthorizationClient is the client used to authorize requests to the metrics endpoint.
metricsAuthorizationClient authorizationv1.AuthorizationV1Interface

// healthProbeListener is used to serve liveness probe
healthProbeListener net.Listener

Expand Down Expand Up @@ -322,17 +311,24 @@ func (cm *controllerManager) addMetricsServer() error {

log := cm.logger.WithValues("path", defaultMetricsEndpoint)

if cm.metricsSecureServing {
if cm.metricsFilter != nil {
var err error
handler, err = withAuthenticationAndAuthorization(log, cm.metricsAuthenticationClient, cm.metricsAuthorizationClient, handler)
handler, err = cm.metricsFilter(log, handler)
if err != nil {
return fmt.Errorf("failed to add metrics server: %w", err)
return fmt.Errorf("failed to add metrics server: failed to add metrics filter %w", err)
}
}

// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux.Handle(defaultMetricsEndpoint, handler)

for path, extraHandler := range cm.metricsExtraHandlers {
if cm.metricsFilter != nil {
var err error
extraHandler, err = cm.metricsFilter(log, extraHandler)
if err != nil {
return fmt.Errorf("failed to add metrics server: failed to add metrics filter to extra handler %w", err)
}
}
mux.Handle(path, extraHandler)
}

Expand All @@ -344,74 +340,6 @@ func (cm *controllerManager) addMetricsServer() error {
})
}

func withAuthenticationAndAuthorization(log logr.Logger, authenticationClient authenticationv1.AuthenticationV1Interface, authorizationClient authorizationv1.AuthorizationV1Interface, handler http.Handler) (http.Handler, error) {
authenticatorConfig := authenticatorfactory.DelegatingAuthenticatorConfig{
Anonymous: false, // Require authentication.
CacheTTL: 1 * time.Minute,
TokenAccessReviewClient: authenticationClient,
TokenAccessReviewTimeout: 10 * time.Second,
WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(),
}
delegatingAuthenticator, _, err := authenticatorConfig.New()
if err != nil {
return nil, fmt.Errorf("failed to create authenticator: %w", err)
}

authorizerConfig := authorizerfactory.DelegatingAuthorizerConfig{
SubjectAccessReviewClient: authorizationClient,
AllowCacheTTL: 5 * time.Minute,
DenyCacheTTL: 30 * time.Second,
WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(),
}
delegatingAuthorizer, err := authorizerConfig.New()
if err != nil {
return nil, fmt.Errorf("failed to create authorizer: %w", err)
}

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}

ctx := req.Context()

res, ok, err := delegatingAuthenticator.AuthenticateRequest(req)
if err != nil {
log.Error(err, "Authentication failed", err)
http.Error(w, "Authentication failed", http.StatusInternalServerError)
return
}
if !ok {
log.V(4).Info("Authentication failed")
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

attributes := authorizer.AttributesRecord{
User: res.User,
Verb: "get",
Path: req.URL.Path,
}

authorized, reason, err := delegatingAuthorizer.Authorize(ctx, attributes)
if err != nil {
msg := fmt.Sprintf("Authorization for user %s failed", attributes.User.GetName())
log.Error(err, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
if authorized != authorizer.DecisionAllow {
msg := fmt.Sprintf("Authorization denied for user %s", attributes.User.GetName())
log.V(4).Info(fmt.Sprintf("%s: %s", msg, reason))
http.Error(w, msg, http.StatusForbidden)
return
}

handler.ServeHTTP(w, req)
}), nil
}

func (cm *controllerManager) serveHealthProbes() {
mux := http.NewServeMux()
server := httpserver.New(mux)
Expand Down
52 changes: 13 additions & 39 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1"
authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -263,37 +260,19 @@ type Options struct {
// for serving prometheus metrics.
// It can be set to "0" to disable the metrics serving.
//
// Per default metrics will be served via http and without authentication and authorization.
// If MetricsSecureServing is enabled metrics will be served via https and authenticated (via TokenReviews)
// and authorized (via SubjectAccessReviews) with the kube-apiserver.
// Per default metrics will be served via http. If MetricsSecureServing is enabled
// metrics will be served via https.
MetricsBindAddress string

// MetricsSecureServing enables secure metrics serving.
// This means metrics will be served via https and authenticated (via TokenReviews)
// and authorized (via SubjectAccessReviews) with the kube-apiserver.
//
// For the authentication and authorization the controller needs a ClusterRole
// with the following rules:
// - apiGroups:
// - authentication.k8s.io
// resources:
// - tokenreviews
// verbs:
// - create
// - apiGroups:
// - authorization.k8s.io
// resources:
// - subjectaccessreviews
// verbs:
// - create
// To scrape metrics e.g. via Prometheus the client needs a ClusterRole
// with the following rule:
// - nonResourceURLs:
// - "/metrics"
// verbs:
// - get
// MetricsSecureServing enables serving metrics via https.
MetricsSecureServing bool

// MetricsFilterProvider provides a metrics filter which is a func that is added before
// the metrics handler on the metrics server.
// This can be e.g. used to enforce authentication and authorization on the metrics
// endpoint by setting this field to filters.WithAuthenticationAndAuthorization.
MetricsFilterProvider func(c *rest.Config, httpClient *http.Client) (metrics.Filter, error)

// HealthProbeBindAddress is the TCP address that the controller should bind to
// for serving health probes
// It can be set to "0" or "" to disable serving the health probe.
Expand Down Expand Up @@ -483,15 +462,12 @@ func New(config *rest.Config, options Options) (Manager, error) {
}
}

var metricsAuthenticationClient authenticationv1.AuthenticationV1Interface
var metricsAuthorizationClient authorizationv1.AuthorizationV1Interface
if options.MetricsSecureServing {
metricsKubeClient, err := kubernetes.NewForConfigAndClient(config, cluster.GetHTTPClient())
var metricsFilter metrics.Filter
if options.MetricsFilterProvider != nil {
metricsFilter, err = options.MetricsFilterProvider(config, cluster.GetHTTPClient())
if err != nil {
return nil, err
}
metricsAuthenticationClient = metricsKubeClient.AuthenticationV1()
metricsAuthorizationClient = metricsKubeClient.AuthorizationV1()
}

// Create the metrics listener. This will throw an error if the metrics bind
Expand Down Expand Up @@ -529,10 +505,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsSecureServing: options.MetricsSecureServing,
metricsFilter: metricsFilter,
metricsExtraHandlers: metricsExtraHandlers,
metricsAuthenticationClient: metricsAuthenticationClient,
metricsAuthorizationClient: metricsAuthorizationClient,
controllerConfig: options.Controller,
logger: options.Logger,
elected: make(chan struct{}),
Expand Down
32 changes: 25 additions & 7 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand Down Expand Up @@ -1380,7 +1381,8 @@ var _ = Describe("manger.Manager", func() {
BeforeEach(func() {
listener = nil
opts = Options{
MetricsSecureServing: true,
MetricsSecureServing: true,
MetricsFilterProvider: filters.WithAuthenticationAndAuthorization,
newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) {
var err error
listener, err = metrics.NewListener(addr, secureServing)
Expand Down Expand Up @@ -1443,7 +1445,7 @@ var _ = Describe("manger.Manager", func() {
}()
<-m.Elected()

// Setup service account with rights to "get" "/metrics"
// Setup service account with rights to "/metrics"
token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics")
defer cleanup()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -1456,6 +1458,7 @@ var _ = Describe("manger.Manager", func() {
resp, err := httpClient.Do(req)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
// This is expected as the token has rights for /metrics.
Expect(resp.StatusCode).To(Equal(200))
body, err := io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1503,7 +1506,7 @@ var _ = Describe("manger.Manager", func() {
}()
<-m.Elected()

// Setup service account with rights to "get" "/metrics"
// Setup service account with rights to "/metrics"
token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics")
defer cleanup()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -1516,6 +1519,7 @@ var _ = Describe("manger.Manager", func() {
resp, err := httpClient.Do(req)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
// This is expected as the token has rights for /metrics.
Expect(resp.StatusCode).To(Equal(200))
data, err := io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1554,22 +1558,36 @@ var _ = Describe("manger.Manager", func() {
}()
<-m.Elected()

// Setup service account with rights to "get" "/debug"
// Setup service account with rights to "/debug"
token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/debug")
defer cleanup()
Expect(err).ToNot(HaveOccurred())

endpoint := fmt.Sprintf("https://%s/debug", listener.Addr().String())
req, err := http.NewRequest("GET", endpoint, nil)
Expect(err).NotTo(HaveOccurred())
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))

resp, err := httpClient.Do(req)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(200))
// This is expected as we didn't send a token.
Expect(resp.StatusCode).To(Equal(401))
body, err := io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
// Unauthorized is expected as we didn't send a token.
Expect(string(body)).To(ContainSubstring("Unauthorized"))

req, err = http.NewRequest("PUT", endpoint, nil)
Expect(err).NotTo(HaveOccurred())
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))

resp, err = httpClient.Do(req)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
// This is expected as the token has rights for /debug.
Expect(resp.StatusCode).To(Equal(200))
body, err = io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
Expect(string(body)).To(Equal("Some debug info"))

metricsEndpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String())
Expand Down Expand Up @@ -2065,7 +2083,7 @@ func setupServiceAccountForURL(ctx context.Context, c client.Client, path string
},
Rules: []rbacv1.PolicyRule{
{
Verbs: []string{"get"},
Verbs: []string{"get", "put"},
NonResourceURLs: []string{path},
},
},
Expand Down
31 changes: 31 additions & 0 deletions pkg/metrics/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Package metrics contains controller related metrics utilities
*/
package metrics

import (
"net/http"

"github.com/go-logr/logr"
)

// Filter is a func that is added before the metrics handler on the metrics server.
// This can be e.g. used to enforce authentication and authorization on the metrics
// endpoint by setting this field to filters.WithAuthenticationAndAuthorization.
type Filter func(log logr.Logger, handler http.Handler) (http.Handler, error)

0 comments on commit 4b2e71f

Please sign in to comment.