Skip to content

Commit

Permalink
fix: reconcile on namespace changes
Browse files Browse the repository at this point in the history
When the operator was configured to select only a limited number of
namespaces, it would not watch for namespace changes. It means that the
operator may not react when a namespace label is added/removed and
changes which objects should be selected or not.

This change enables the operator to use a privileged namespace
lister/watcher whenever the service account has the needed permissions.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Sep 7, 2023
1 parent 2ca2c75 commit d8e91df
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 86 deletions.
21 changes: 13 additions & 8 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,21 @@ func checkPrerequisites(
}

if !installed {
level.Warn(logger).Log("msg", fmt.Sprintf("resource %q (group: %q) not installed in the cluster", resource, groupVersion))
level.Info(logger).Log("msg", fmt.Sprintf("CRD %q not supported", resource), "reason", fmt.Sprintf("resource %q (group: %q) not installed in the cluster", resource, groupVersion))
return false, nil
}

allowed, errs, err := k8sutil.IsAllowed(ctx, kclient.AuthorizationV1(), allowedNamespaces, attributes...)
allowed, errs, err := k8sutil.IsAllowed(ctx, kclient.AuthorizationV1().SelfSubjectAccessReviews(), allowedNamespaces, attributes...)
if err != nil {
return false, fmt.Errorf("failed to check permissions on resource %q (group %q): %w", resource, groupVersion, err)
}

if !allowed {
for _, reason := range errs {
level.Warn(logger).Log("msg", fmt.Sprintf("missing permission on resource %q (group: %q)", resource, groupVersion), "reason", reason)
var reasons []string
for _, err := range errs {
reasons = append(reasons, err.Error())
}
level.Info(logger).Log("msg", fmt.Sprintf("CRD %q not supported", resource), "reason", strings.Join(reasons, ", "))
return false, nil
}

Expand Down Expand Up @@ -179,6 +181,7 @@ func init() {
"If omitted, the default Go cipher suites will be used."+
"Note that TLS 1.3 ciphersuites are not configurable.")
flagset.StringVar(&cfg.Host, "apiserver", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
flagset.StringVar(&cfg.ImpersonateUser, "as", "", "Username to impersonate. User could be a regular user or a service account in a namespace.")
flagset.StringVar(&cfg.TLSConfig.CertFile, "cert-file", "", " - NOT RECOMMENDED FOR PRODUCTION - Path to public TLS certificate file.")
flagset.StringVar(&cfg.TLSConfig.KeyFile, "key-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to private TLS certificate file.")
flagset.StringVar(&cfg.TLSConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
Expand Down Expand Up @@ -283,7 +286,7 @@ func run() int {

k8sutil.MustRegisterClientGoMetrics(r)

kclient, err := k8sutil.NewKubernetesClient(cfg.Host, cfg.TLSInsecure, &cfg.TLSConfig)
kclient, err := k8sutil.NewKubernetesClient(cfg.Host, cfg.TLSInsecure, &cfg.TLSConfig, cfg.ImpersonateUser)
if err != nil {
level.Error(logger).Log("msg", "failed to create Kubernetes client", "err", err)
cancel()
Expand All @@ -305,13 +308,14 @@ func run() int {
)

if err != nil {
level.Error(logger).Log("msg", "failed to check prerequisites for ScrapeConfig", "err", err)
cancel()
return 1
}

po, err := prometheuscontroller.New(ctx, cfg, log.With(logger, "component", "prometheusoperator"), r, scrapeConfigSupported)
if err != nil {
fmt.Fprintln(os.Stderr, "instantiating prometheus controller failed: ", err)
fmt.Fprintln(os.Stderr, "instantiating prometheus controller failed:", err)
cancel()
return 1
}
Expand All @@ -335,6 +339,7 @@ func run() int {
},
)
if err != nil {
level.Error(logger).Log("msg", "failed to check prerequisites for PrometheusAgent", "err", err)
cancel()
return 1
}
Expand All @@ -351,14 +356,14 @@ func run() int {

ao, err := alertmanagercontroller.New(ctx, cfg, log.With(logger, "component", "alertmanageroperator"), r)
if err != nil {
fmt.Fprintln(os.Stderr, "instantiating alertmanager controller failed: ", err)
fmt.Fprintln(os.Stderr, "instantiating alertmanager controller failed:", err)
cancel()
return 1
}

to, err := thanoscontroller.New(ctx, cfg, log.With(logger, "component", "thanosoperator"), r)
if err != nil {
fmt.Fprintln(os.Stderr, "instantiating thanos controller failed: ", err)
fmt.Fprintln(os.Stderr, "instantiating thanos controller failed:", err)
cancel()
return 1
}
Expand Down
37 changes: 27 additions & 10 deletions pkg/alertmanager/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Config struct {

// New creates a new controller.
func New(ctx context.Context, c operator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig)
cfg, err := k8sutil.NewClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig, c.ImpersonateUser)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (c *Operator) bootstrap(ctx context.Context) error {
return errors.Wrap(err, "error creating statefulset informers")
}

newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) (cache.SharedIndexInformer, error) {
// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
Expand All @@ -255,20 +255,37 @@ func (c *Operator) bootstrap(ctx context.Context) error {
if listwatch.IsAllNamespaces(allowList) {
nsResyncPeriod = resyncPeriod
}
nsInf := cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.NewUnprivilegedNamespaceListWatchFromClient(ctx, o.logger, o.kclient.CoreV1().RESTClient(), allowList, o.config.Namespaces.DenyList),
),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},

lw, err := listwatch.NewNamespaceListWatchFromClient(
ctx,
o.logger,
o.kclient,
allowList,
o.config.Namespaces.DenyList,
)
if err != nil {
return nil, err
}

return nsInf
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
cache.Indexers{},
), nil
}
c.nsAlrtCfgInf = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerConfigAllowList)
c.nsAlrtCfgInf, err = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerConfigAllowList)
if err != nil {
return err
}

if listwatch.IdenticalNamespaces(c.config.Namespaces.AlertmanagerConfigAllowList, c.config.Namespaces.AlertmanagerAllowList) {
c.nsAlrtInf = c.nsAlrtCfgInf
} else {
c.nsAlrtInf = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerAllowList)
c.nsAlrtInf, err = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerAllowList)
if err != nil {
return err
}
}

return nil
Expand Down
13 changes: 8 additions & 5 deletions pkg/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func init() {
_ = monitoringv1beta1.SchemeBuilder.AddToScheme(scheme)
}

func NewKubernetesClient(host string, tlsInsecure bool, tlsConfig *rest.TLSClientConfig) (kubernetes.Interface, error) {
cfg, err := NewClusterConfig(host, tlsInsecure, tlsConfig)
func NewKubernetesClient(host string, tlsInsecure bool, tlsConfig *rest.TLSClientConfig, asUser string) (kubernetes.Interface, error) {
cfg, err := NewClusterConfig(host, tlsInsecure, tlsConfig, asUser)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func PodRunningAndReady(pod v1.Pod) (bool, error) {
return false, nil
}

func NewClusterConfig(host string, tlsInsecure bool, tlsConfig *rest.TLSClientConfig) (*rest.Config, error) {
func NewClusterConfig(host string, tlsInsecure bool, tlsConfig *rest.TLSClientConfig, asUser string) (*rest.Config, error) {
var cfg *rest.Config
var err error

Expand Down Expand Up @@ -130,6 +130,7 @@ func NewClusterConfig(host string, tlsInsecure bool, tlsConfig *rest.TLSClientCo
cfg.Burst = 100

cfg.UserAgent = fmt.Sprintf("PrometheusOperator/%s", promversion.Version)
cfg.Impersonate.UserName = asUser

return cfg, nil
}
Expand All @@ -138,6 +139,7 @@ func NewClusterConfig(host string, tlsInsecure bool, tlsConfig *rest.TLSClientCo
type ResourceAttribute struct {
Verbs []string
Resource string
Name string
Group string
}

Expand All @@ -147,7 +149,7 @@ type ResourceAttribute struct {
// namespace value means "all").
// The second return value returns the list of permissions that are missing if
// the requirements aren't met.
func IsAllowed(ctx context.Context, client clientauthv1.AuthorizationV1Interface, nsAllowList []string, attributes ...ResourceAttribute) (bool, []error, error) {
func IsAllowed(ctx context.Context, ssarClient clientauthv1.SelfSubjectAccessReviewInterface, nsAllowList []string, attributes ...ResourceAttribute) (bool, []error, error) {
if len(attributes) == 0 {
return false, nil, fmt.Errorf("attributes must not be empty")
}
Expand All @@ -162,14 +164,15 @@ func IsAllowed(ctx context.Context, client clientauthv1.AuthorizationV1Interface
Verb: verb,
Group: ra.Group,
Resource: ra.Resource,
Name: ra.Name,
// An empty namespace value means "all" for namespace-scoped resources.
Namespace: ns,
},
},
}

// FIXME(simonpasquier): retry in case of server-side errors.
ssarResponse, err := client.SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{})
ssarResponse, err := ssarClient.Create(ctx, ssar, metav1.CreateOptions{})
if err != nil {
return false, nil, err
}
Expand Down
131 changes: 103 additions & 28 deletions pkg/listwatch/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ package listwatch

import (
"context"
"fmt"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -44,56 +45,110 @@ import (
//
// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string),
// the given denied namespaces are applied.
func NewUnprivilegedNamespaceListWatchFromClient(
func NewNamespaceListWatchFromClient(
ctx context.Context,
l log.Logger,
c cache.Getter,
kclient kubernetes.Interface,
allowedNamespaces, deniedNamespaces map[string]struct{},
) cache.ListerWatcher {
) (cache.ListerWatcher, error) {
if l == nil {
l = log.NewNopLogger()
}

// If the only namespace given is `v1.NamespaceAll`, then this
// cache.ListWatch must be privileged. In this case, return a regular
// cache.ListWatch tweaked with denylist fieldselector
// filtering the given denied namespaces.
if IsAllNamespaces(allowedNamespaces) {
tweak := func(options *metav1.ListOptions) {
DenyTweak(options, "metadata.name", deniedNamespaces)
listWatchAllowed, reasons, err := k8sutil.IsAllowed(
ctx,
kclient.AuthorizationV1().SelfSubjectAccessReviews(),
[]string{v1.NamespaceAll},
k8sutil.ResourceAttribute{
Resource: "namespaces",
Verbs: []string{"list", "watch"},
},
)
if err != nil {
return nil, err
}

if IsAllNamespaces(allowedNamespaces) && !listWatchAllowed {
err := fmt.Errorf("missing list/watch permissions on the 'namespaces' resource")
for _, r := range reasons {
err = fmt.Errorf("%w: %w", err, r)
}

return nil, err
}

if listWatchAllowed {
var tweak func(options *metav1.ListOptions)
if IsAllNamespaces(allowedNamespaces) {
// deniedNamespaces is only supported when allowedNamespaces = "all".
tweak = func(options *metav1.ListOptions) {
DenyTweak(options, "metadata.name", deniedNamespaces)
}
} else {
tweak = func(options *metav1.ListOptions) {
DenyTweak(options, "metadata.name", allowedNamespaces)
}
}

level.Debug(l).Log("msg", "using privileged namespace lister/watcher")
return cache.NewFilteredListWatchFromClient(kclient.CoreV1().RESTClient(), "namespaces", metav1.NamespaceAll, tweak), nil
}

// At this point, the operator has no list/watch permissions on the namespaces resource.
attrs := make([]k8sutil.ResourceAttribute, 0, len(allowedNamespaces))
for n := range allowedNamespaces {
attrs = append(attrs, k8sutil.ResourceAttribute{
Verbs: []string{"get"},
Resource: "namespaces",
Name: n,
})
}
getAllowed, reasons, err := k8sutil.IsAllowed(
ctx,
kclient.AuthorizationV1().SelfSubjectAccessReviews(),
[]string{v1.NamespaceAll},
attrs...,
)
if err != nil {
return nil, err
}

if !getAllowed {
err := fmt.Errorf("missing get permissions on the 'namespaces' resource")
for _, r := range reasons {
err = fmt.Errorf("%w: %w", err, r)
}

return cache.NewFilteredListWatchFromClient(c, "namespaces", metav1.NamespaceAll, tweak)
return nil, err
}

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
list := &v1.NamespaceList{}
for name := range allowedNamespaces {
result := &v1.Namespace{}
err := c.Get().
Resource("namespaces").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
if apierrors.IsNotFound(err) {
level.Info(l).Log("msg", "namespace not found", "namespace", name)
continue
}
result, err := kclient.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "unexpected error while listing namespaces")
if apierrors.IsNotFound(err) {
level.Info(l).Log("msg", "namespace not found", "namespace", name)
continue
}

return nil, fmt.Errorf("unexpected error while listing namespaces: %w", err)
}

list.Items = append(list.Items, *result)
}
return list, nil
}

watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
// Since the client does not have Watch privileges, do not
// actually watch anything. Use a watch.FakeWatcher here to
// implement watch.Interface but not send any events.
return watch.NewFake(), nil
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}

level.Debug(l).Log("msg", "using privileged namespace lister/watcher")
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, nil
}

// IsAllNamespaces checks if the given map of namespaces
Expand All @@ -118,8 +173,28 @@ func IdenticalNamespaces(a, b map[string]struct{}) bool {
return true
}

// DenyTweak modifies the given list options
// by adding a field selector not matching the given values.
// OnlyTweak modifies the given list options by adding a field selector
// matching only the given values.
func OnlyTweak(options *metav1.ListOptions, field string, valueSet map[string]struct{}) {
if len(valueSet) == 0 {
return
}

var selectors []string

for value := range valueSet {
selectors = append(selectors, field+"="+value)
}

if options.FieldSelector != "" {
selectors = append(selectors, options.FieldSelector)
}

options.FieldSelector = strings.Join(selectors, ",")
}

// DenyTweak modifies the given list options by adding a field selector not
// matching the given values.
func DenyTweak(options *metav1.ListOptions, field string, valueSet map[string]struct{}) {
if len(valueSet) == 0 {
return
Expand Down

0 comments on commit d8e91df

Please sign in to comment.