Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: detect changes without list/watch perms on namespaces #5934

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 2 additions & 13 deletions pkg/alertmanager/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,11 @@ func (c *Operator) bootstrap(ctx context.Context) error {
return nil, fmt.Errorf("failed to create namespace lister/watcher: %w", err)
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
resyncPeriod,
cache.Indexers{},
), nil
}
Expand Down
189 changes: 160 additions & 29 deletions pkg/listwatch/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package listwatch

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sort"
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand All @@ -27,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
authv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
Expand All @@ -36,6 +40,10 @@ import (
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
)

const (
pollInterval = 15 * time.Second
)

// NewNamespaceListWatchFromClient mimics
// cache.NewListWatchFromClient. It allows for the creation of a
// cache.ListWatch for namespaces from a client that does not have `List`
Expand Down Expand Up @@ -105,7 +113,6 @@ func NewNamespaceListWatchFromClient(
}
}

level.Debug(l).Log("msg", "using privileged namespace lister/watcher")
return cache.NewFilteredListWatchFromClient(
corev1Client.RESTClient(),
"namespaces",
Expand Down Expand Up @@ -158,36 +165,12 @@ func NewNamespaceListWatchFromClient(
level.Warn(l).Log("msg", "the operator lacks required permissions which may result in degraded functionalities", "err", err)
}

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
list := &v1.NamespaceList{}
for name := range allowedNamespaces {
result, err := corev1Client.Namespaces().Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
level.Info(l).Log("msg", "namespace not found", "namespace", name)
continue
}

return nil, fmt.Errorf("unexpected error while listing namespaces: %w", err)
}

list.Items = append(list.Items, *result)
}
return list, nil
}

// Since the client does not have Watch privileges, do not
// actually watch anything. Use a watch.FakeWatcher here to
// implement watch.Interface but not send any events.
watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
// TODO(simonpasquier): implement a poll-based watcher that gets the
// list of namespaces perdiocially and send watch.Event() whenever it
// detects a change.
return watch.NewFake(), nil
var namespaces []string
for ns := range allowedNamespaces {
namespaces = append(namespaces, ns)
}

level.Debug(l).Log("msg", "using unprivileged namespace lister/watcher")
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, false, nil
return newPollBasedListerWatcher(ctx, l, corev1Client, namespaces), false, nil
}

// IsAllNamespaces checks if the given map of namespaces
Expand Down Expand Up @@ -269,3 +252,151 @@ func DenyTweak(options *metav1.ListOptions, field string, valueSet map[string]st

options.FieldSelector = strings.Join(selectors, ",")
}

type pollBasedListerWatcher struct {
corev1Client corev1.CoreV1Interface
ch chan watch.Event

ctx context.Context
l log.Logger

cache map[string]cacheEntry
}

type cacheEntry struct {
present bool
ns *v1.Namespace
}

var _ = watch.Interface(&pollBasedListerWatcher{})
var _ = cache.ListerWatcher(&pollBasedListerWatcher{})

func newPollBasedListerWatcher(ctx context.Context, l log.Logger, corev1Client corev1.CoreV1Interface, namespaces []string) *pollBasedListerWatcher {
if l == nil {
l = log.NewNopLogger()
}

pblw := &pollBasedListerWatcher{
corev1Client: corev1Client,
ch: make(chan watch.Event, 1),
ctx: ctx,
l: l,
cache: make(map[string]cacheEntry, len(namespaces)),
}

for _, ns := range namespaces {
pblw.cache[ns] = cacheEntry{}
}

return pblw
}

func (pblw *pollBasedListerWatcher) List(_ metav1.ListOptions) (runtime.Object, error) {
list := &v1.NamespaceList{}

for ns := range pblw.cache {
result, err := pblw.corev1Client.Namespaces().Get(pblw.ctx, ns, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
level.Info(pblw.l).Log("msg", "namespace not found", "namespace", ns)
continue
}

return nil, fmt.Errorf("unexpected error while listing namespaces: %w", err)
}

pblw.cache[ns] = cacheEntry{
present: true,
ns: result,
}
list.Items = append(list.Items, *result)
}

return list, nil
}

func (pblw *pollBasedListerWatcher) Watch(_ metav1.ListOptions) (watch.Interface, error) {
return pblw, nil
}

func (pblw *pollBasedListerWatcher) Stop() {}

func (pblw *pollBasedListerWatcher) ResultChan() <-chan watch.Event {
go func() {
jitter, err := rand.Int(rand.Reader, big.NewInt(int64(pollInterval)))
if err == nil {
time.Sleep(time.Duration(jitter.Int64()))
} else {
level.Info(pblw.l).Log("msg", "failed to generate random jitter", "err", err)
}

_ = wait.PollUntilContextCancel(pblw.ctx, pollInterval, false, pblw.poll)
}()

return pblw.ch
}

func (pblw *pollBasedListerWatcher) poll(ctx context.Context) (bool, error) {
var (
updated []*v1.Namespace
deleted []string
)

for ns, entry := range pblw.cache {
result, err := pblw.corev1Client.Namespaces().Get(ctx, ns, metav1.GetOptions{ResourceVersion: entry.ns.ResourceVersion})
if err != nil {
switch {
case apierrors.IsNotFound(err):
if entry.present {
deleted = append(deleted, ns)
}
default:
level.Warn(pblw.l).Log("msg", "watch error", "err", err, "namespace", ns)
}
continue
}

if entry.ns.ResourceVersion != result.ResourceVersion {
updated = append(updated, result)
}
}

for _, ns := range deleted {
entry := pblw.cache[ns]

pblw.ch <- watch.Event{
Type: watch.Deleted,
Object: entry.ns,
}

pblw.cache[ns] = cacheEntry{
present: false,
}
}

for _, ns := range updated {
var (
eventType = watch.Modified
entry = pblw.cache[ns.Name]
)

switch {
case !entry.present:
eventType = watch.Added
case ns.ResourceVersion == entry.ns.ResourceVersion:
continue
}

pblw.ch <- watch.Event{
Type: eventType,
Object: ns,
}

pblw.cache[ns.Name] = cacheEntry{
ns: ns,
present: true,
}
}

return false, nil
}
15 changes: 2 additions & 13 deletions pkg/prometheus/agent/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,10 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},
&v1.Namespace{}, resyncPeriod, cache.Indexers{},
), nil
}

Expand Down
15 changes: 2 additions & 13 deletions pkg/prometheus/server/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,10 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(c.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{}, nsResyncPeriod, cache.Indexers{},
&v1.Namespace{}, resyncPeriod, cache.Indexers{},
), nil
}

Expand Down
15 changes: 2 additions & 13 deletions pkg/thanos/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,22 +223,11 @@ func New(ctx context.Context, restConfig *rest.Config, conf operator.Config, log
return nil, err
}

// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
// informer must resync more often because it cannot watch for
// namespace changes.
nsResyncPeriod := 15 * time.Second
// If the only namespace is v1.NamespaceAll, then the client must be
// privileged and a regular cache.ListWatch will be used. In this case
// watching works and we do not need to resync so frequently.
if privileged {
nsResyncPeriod = resyncPeriod
}

level.Debug(o.logger).Log("msg", "creating namespace informer", "privileged", privileged)
return cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(lw),
&v1.Namespace{},
nsResyncPeriod,
resyncPeriod,
cache.Indexers{},
), nil
}
Expand Down
16 changes: 6 additions & 10 deletions test/e2e/prometheus_instance_namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,16 @@ func testPrometheusInstanceNamespacesAllowList(t *testing.T) {
t.Fatal(err)
}

// FIXME(simonpasquier): the unprivileged namespace lister/watcher
// isn't notified of updates properly so the code below fails.
// Uncomment the test once the lister/watcher is fixed.
//
// Remove the selecting label on the "allowed" namespace and check that
// the target is removed.
// See https://github.com/prometheus-operator/prometheus-operator/issues/3847
//if err := framework.RemoveLabelsFromNamespace(context.Background(), allowedNs, "monitored"); err != nil {
// t.Fatal(err)
//}
if err := framework.RemoveLabelsFromNamespace(context.Background(), allowedNs, "monitored"); err != nil {
t.Fatal(err)
}

//if err := framework.WaitForActiveTargets(context.Background(), instanceNs, "prometheus-instance", 0); err != nil {
// t.Fatal(err)
//}
if err := framework.WaitForActiveTargets(context.Background(), instanceNs, "prometheus-instance", 0); err != nil {
t.Fatal(err)
}
}

// this is not ideal, as we cannot really find out if prometheus operator did not reconcile the denied prometheus.
Expand Down