Skip to content

Commit

Permalink
Merge pull request #5934 from simonpasquier/fix-3847
Browse files Browse the repository at this point in the history
fix: detect changes without list/watch perms on namespaces
  • Loading branch information
simonpasquier committed Sep 21, 2023
2 parents 2e7f977 + 7f06be0 commit 4262cd1
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 91 deletions.
15 changes: 2 additions & 13 deletions pkg/alertmanager/operator.go
Expand Up @@ -262,22 +262,11 @@ func (c *Operator) bootstrap(ctx context.Context) error {
return nil, fmt.Errorf("failed to create namespace lister/watcher: %w", err)
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
resyncPeriod,
cache.Indexers{},
), nil
}
Expand Down
189 changes: 160 additions & 29 deletions pkg/listwatch/listwatch.go
Expand Up @@ -16,9 +16,12 @@ package listwatch

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sort"
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand All @@ -27,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
authv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
Expand All @@ -36,6 +40,10 @@ import (
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
)

const (
pollInterval = 15 * time.Second
)

// NewNamespaceListWatchFromClient mimics
// cache.NewListWatchFromClient. It allows for the creation of a
// cache.ListWatch for namespaces from a client that does not have `List`
Expand Down Expand Up @@ -105,7 +113,6 @@ func NewNamespaceListWatchFromClient(
}
}

level.Debug(l).Log("msg", "using privileged namespace lister/watcher")
return cache.NewFilteredListWatchFromClient(
corev1Client.RESTClient(),
"namespaces",
Expand Down Expand Up @@ -158,36 +165,12 @@ func NewNamespaceListWatchFromClient(
level.Warn(l).Log("msg", "the operator lacks required permissions which may result in degraded functionalities", "err", err)
}

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
list := &v1.NamespaceList{}
for name := range allowedNamespaces {
result, err := corev1Client.Namespaces().Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
level.Info(l).Log("msg", "namespace not found", "namespace", name)
continue
}

return nil, fmt.Errorf("unexpected error while listing namespaces: %w", err)
}

list.Items = append(list.Items, *result)
}
return list, nil
}

// Since the client does not have Watch privileges, do not
// actually watch anything. Use a watch.FakeWatcher here to
// implement watch.Interface but not send any events.
watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
// TODO(simonpasquier): implement a poll-based watcher that gets the
// list of namespaces perdiocially and send watch.Event() whenever it
// detects a change.
return watch.NewFake(), nil
var namespaces []string
for ns := range allowedNamespaces {
namespaces = append(namespaces, ns)
}

level.Debug(l).Log("msg", "using unprivileged namespace lister/watcher")
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, false, nil
return newPollBasedListerWatcher(ctx, l, corev1Client, namespaces), false, nil
}

// IsAllNamespaces checks if the given map of namespaces
Expand Down Expand Up @@ -269,3 +252,151 @@ func DenyTweak(options *metav1.ListOptions, field string, valueSet map[string]st

options.FieldSelector = strings.Join(selectors, ",")
}

type pollBasedListerWatcher struct {
corev1Client corev1.CoreV1Interface
ch chan watch.Event

ctx context.Context
l log.Logger

cache map[string]cacheEntry
}

type cacheEntry struct {
present bool
ns *v1.Namespace
}

var _ = watch.Interface(&pollBasedListerWatcher{})
var _ = cache.ListerWatcher(&pollBasedListerWatcher{})

func newPollBasedListerWatcher(ctx context.Context, l log.Logger, corev1Client corev1.CoreV1Interface, namespaces []string) *pollBasedListerWatcher {
if l == nil {
l = log.NewNopLogger()
}

pblw := &pollBasedListerWatcher{
corev1Client: corev1Client,
ch: make(chan watch.Event, 1),
ctx: ctx,
l: l,
cache: make(map[string]cacheEntry, len(namespaces)),
}

for _, ns := range namespaces {
pblw.cache[ns] = cacheEntry{}
}

return pblw
}

func (pblw *pollBasedListerWatcher) List(_ metav1.ListOptions) (runtime.Object, error) {
list := &v1.NamespaceList{}

for ns := range pblw.cache {
result, err := pblw.corev1Client.Namespaces().Get(pblw.ctx, ns, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
level.Info(pblw.l).Log("msg", "namespace not found", "namespace", ns)
continue
}

return nil, fmt.Errorf("unexpected error while listing namespaces: %w", err)
}

pblw.cache[ns] = cacheEntry{
present: true,
ns: result,
}
list.Items = append(list.Items, *result)
}

return list, nil
}

func (pblw *pollBasedListerWatcher) Watch(_ metav1.ListOptions) (watch.Interface, error) {
return pblw, nil
}

func (pblw *pollBasedListerWatcher) Stop() {}

func (pblw *pollBasedListerWatcher) ResultChan() <-chan watch.Event {
go func() {
jitter, err := rand.Int(rand.Reader, big.NewInt(int64(pollInterval)))
if err == nil {
time.Sleep(time.Duration(jitter.Int64()))
} else {
level.Info(pblw.l).Log("msg", "failed to generate random jitter", "err", err)
}

_ = wait.PollUntilContextCancel(pblw.ctx, pollInterval, false, pblw.poll)
}()

return pblw.ch
}

func (pblw *pollBasedListerWatcher) poll(ctx context.Context) (bool, error) {
var (
updated []*v1.Namespace
deleted []string
)

for ns, entry := range pblw.cache {
result, err := pblw.corev1Client.Namespaces().Get(ctx, ns, metav1.GetOptions{ResourceVersion: entry.ns.ResourceVersion})
if err != nil {
switch {
case apierrors.IsNotFound(err):
if entry.present {
deleted = append(deleted, ns)
}
default:
level.Warn(pblw.l).Log("msg", "watch error", "err", err, "namespace", ns)
}
continue
}

if entry.ns.ResourceVersion != result.ResourceVersion {
updated = append(updated, result)
}
}

for _, ns := range deleted {
entry := pblw.cache[ns]

pblw.ch <- watch.Event{
Type: watch.Deleted,
Object: entry.ns,
}

pblw.cache[ns] = cacheEntry{
present: false,
}
}

for _, ns := range updated {
var (
eventType = watch.Modified
entry = pblw.cache[ns.Name]
)

switch {
case !entry.present:
eventType = watch.Added
case ns.ResourceVersion == entry.ns.ResourceVersion:
continue
}

pblw.ch <- watch.Event{
Type: eventType,
Object: ns,
}

pblw.cache[ns.Name] = cacheEntry{
ns: ns,
present: true,
}
}

return false, nil
}
15 changes: 2 additions & 13 deletions pkg/prometheus/agent/operator.go
Expand Up @@ -271,21 +271,10 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},
&v1.Namespace{}, resyncPeriod, cache.Indexers{},
), nil
}

Expand Down
15 changes: 2 additions & 13 deletions pkg/prometheus/server/operator.go
Expand Up @@ -332,21 +332,10 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},
&v1.Namespace{}, resyncPeriod, cache.Indexers{},
), nil
}

Expand Down
15 changes: 2 additions & 13 deletions pkg/thanos/operator.go
Expand Up @@ -223,22 +223,11 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(o.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
resyncPeriod,
cache.Indexers{},
), nil
}
Expand Down
16 changes: 6 additions & 10 deletions test/e2e/prometheus_instance_namespaces_test.go
Expand Up @@ -324,20 +324,16 @@ func testPrometheusInstanceNamespacesAllowList(t *testing.T) {
t.Fatal(err)
}

// FIXME(simonpasquier): the unprivileged namespace lister/watcher
// isn't notified of updates properly so the code below fails.
// Uncomment the test once the lister/watcher is fixed.
//
// Remove the selecting label on the "allowed" namespace and check that
// the target is removed.
// See https://github.com/prometheus-operator/prometheus-operator/issues/3847
//if err := framework.RemoveLabelsFromNamespace(context.Background(), allowedNs, "monitored"); err != nil {
// t.Fatal(err)
//}
if err := framework.RemoveLabelsFromNamespace(context.Background(), allowedNs, "monitored"); err != nil {
t.Fatal(err)
}

//if err := framework.WaitForActiveTargets(context.Background(), instanceNs, "prometheus-instance", 0); err != nil {
// t.Fatal(err)
//}
if err := framework.WaitForActiveTargets(context.Background(), instanceNs, "prometheus-instance", 0); err != nil {
t.Fatal(err)
}
}

// this is not ideal, as we cannot really find out if prometheus operator did not reconcile the denied prometheus.
Expand Down

0 comments on commit 4262cd1

Please sign in to comment.