Skip to content

Commit

Permalink
fix: reconcile on namespace changes
Browse files Browse the repository at this point in the history
When the operator was configured to select only a limited number of
namespaces, it would not watch for namespace changes. It means that the
operator may not reconcile when a namespace label is added/removed
(affecting which objects should be selected or not).

This change enables the operator to use a privileged namespace
lister/watcher whenever the service account has the needed permissions.

**IMPORTANT:** it also requires Kubernetes >= 1.22 to be effective but
the operator will degrade to the suboptimal implementation without watch
if this condition isn't met.

Closes #3847

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Sep 14, 2023
1 parent 0d4d33d commit bd2b1bc
Show file tree
Hide file tree
Showing 14 changed files with 604 additions and 210 deletions.
9 changes: 9 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ func run() int {
return 1
}

kubernetesVersion, err := kclient.Discovery().ServerVersion()
if err != nil {
level.Error(logger).Log("msg", "failed to request Kubernetes server version", "err", err)
cancel()
return 1
}
cfg.KubernetesVersion = *kubernetesVersion
level.Info(logger).Log("msg", "connection established", "cluster-version", cfg.KubernetesVersion)

scrapeConfigSupported, err := checkPrerequisites(
ctx,
logger,
Expand Down
82 changes: 44 additions & 38 deletions pkg/alertmanager/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
authv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -70,9 +72,11 @@ var (
// Operator manages life cycle of Alertmanager deployments and
// monitoring configurations.
type Operator struct {
kclient kubernetes.Interface
mdClient metadata.Interface
mclient monitoringclient.Interface
kclient kubernetes.Interface
mdClient metadata.Interface
mclient monitoringclient.Interface
ssarClient authv1.SelfSubjectAccessReviewInterface

logger log.Logger
accessor *operator.Accessor

Expand All @@ -93,6 +97,7 @@ type Operator struct {
}

type Config struct {
KubernetesVersion version.Info
LocalHost string
ClusterDomain string
ReloaderConfig operator.ContainerConfig
Expand Down Expand Up @@ -125,15 +130,18 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
r = prometheus.WrapRegistererWith(prometheus.Labels{"controller": "alertmanager"}, r)

o := &Operator{
kclient: client,
mdClient: mdClient,
mclient: mclient,
kclient: client,
mdClient: mdClient,
mclient: mclient,
ssarClient: client.AuthorizationV1().SelfSubjectAccessReviews(),

logger: logger,
accessor: operator.NewAccessor(logger),

metrics: operator.NewMetrics(r),
reconciliations: &operator.ReconciliationTracker{},
config: Config{
KubernetesVersion: c.KubernetesVersion,
LocalHost: c.LocalHost,
ClusterDomain: c.ClusterDomain,
ReloaderConfig: c.ReloaderConfig,
Expand Down Expand Up @@ -241,7 +249,20 @@ func (c *Operator) bootstrap(ctx context.Context) error {
return errors.Wrap(err, "error creating statefulset informers")
}

newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) (cache.SharedIndexInformer, error) {
lw, privileged, err := listwatch.NewNamespaceListWatchFromClient(
ctx,
o.logger,
o.config.KubernetesVersion,
o.kclient.CoreV1(),
o.ssarClient,
allowList,
o.config.Namespaces.DenyList,
)
if err != nil {
return nil, fmt.Errorf("failed to create namespace lister/watcher: %w", err)
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
Expand All @@ -250,23 +271,29 @@ func (c *Operator) bootstrap(ctx context.Context) error {
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if listwatch.IsAllNamespaces(allowList) {
if privileged {
nsResyncPeriod = resyncPeriod
}
nsInf := cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.NewUnprivilegedNamespaceListWatchFromClient(ctx, o.logger, o.kclient.CoreV1().RESTClient(), allowList, o.config.Namespaces.DenyList),
),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},
)

return nsInf
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
cache.Indexers{},
), nil
}
c.nsAlrtCfgInf, err = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerConfigAllowList)
if err != nil {
return err
}
c.nsAlrtCfgInf = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerConfigAllowList)

if listwatch.IdenticalNamespaces(c.config.Namespaces.AlertmanagerConfigAllowList, c.config.Namespaces.AlertmanagerAllowList) {
c.nsAlrtInf = c.nsAlrtCfgInf
} else {
c.nsAlrtInf = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerAllowList)
c.nsAlrtInf, err = newNamespaceInformer(c, c.config.Namespaces.AlertmanagerAllowList)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -464,27 +491,6 @@ func (c *Operator) enqueueForNamespace(nsName string) {

// Run the controller.
func (c *Operator) Run(ctx context.Context) error {
errChan := make(chan error)
go func() {
v, err := c.kclient.Discovery().ServerVersion()
if err != nil {
errChan <- errors.Wrap(err, "communicating with server failed")
return
}
level.Info(c.logger).Log("msg", "connection established", "cluster-version", v)
errChan <- nil
}()

select {
case err := <-errChan:
if err != nil {
return err
}
level.Info(c.logger).Log("msg", "CRD API endpoints ready")
case <-ctx.Done():
return nil
}

go c.rr.Run(ctx)
defer c.rr.Stop()

Expand Down
31 changes: 27 additions & 4 deletions pkg/alertmanager/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
authv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -1233,10 +1234,21 @@ func TestProvisionAlertmanagerConfiguration(t *testing.T) {
c := fake.NewSimpleClientset(tc.objects...)

o := &Operator{
kclient: c,
mclient: monitoringfake.NewSimpleClientset(),
logger: level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.AllowInfo()),
metrics: operator.NewMetrics(prometheus.NewRegistry()),
kclient: c,
mclient: monitoringfake.NewSimpleClientset(),
ssarClient: &alwaysAllowed{},
logger: level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.AllowInfo()),
metrics: operator.NewMetrics(prometheus.NewRegistry()),
config: Config{
Namespaces: operator.Namespaces{
AlertmanagerConfigAllowList: map[string]struct{}{
v1.NamespaceAll: {},
},
AlertmanagerAllowList: map[string]struct{}{
"foo": {},
},
},
},
}

err := o.bootstrap(context.Background())
Expand Down Expand Up @@ -1275,3 +1287,14 @@ func TestProvisionAlertmanagerConfiguration(t *testing.T) {
})
}
}

// alwaysAllowed implements SelfSubjectAccessReviewInterface.
type alwaysAllowed struct{}

func (*alwaysAllowed) Create(_ context.Context, _ *authv1.SelfSubjectAccessReview, _ metav1.CreateOptions) (*authv1.SelfSubjectAccessReview, error) {
return &authv1.SelfSubjectAccessReview{
Status: authv1.SubjectAccessReviewStatus{
Allowed: true,
},
}, nil
}

0 comments on commit bd2b1bc

Please sign in to comment.