Skip to content

Commit

Permalink
k8s: remove ability to watch all namespaces (#4363)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Mar 25, 2021
1 parent 529ac3b commit 5c71161
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 172 deletions.
2 changes: 1 addition & 1 deletion internal/engine/k8swatch/pod_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (w *PodWatcher) OnChange(ctx context.Context, st store.RStore, _ store.Chan
}

func (w *PodWatcher) setupWatch(ctx context.Context, st store.RStore, ns k8s.Namespace) {
ch, err := w.kCli.WatchPods(ctx, ns, labels.Everything())
ch, err := w.kCli.WatchPods(ctx, ns)
if err != nil {
err = errors.Wrapf(err, "Error watching pods. Are you connected to kubernetes?\nTry running `kubectl get pods -n %q`", ns)
st.Dispatch(store.NewErrorAction(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/k8swatch/service_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (w *ServiceWatcher) OnChange(ctx context.Context, st store.RStore, _ store.
}

func (w *ServiceWatcher) setupWatch(ctx context.Context, st store.RStore, ns k8s.Namespace) {
ch, err := w.kCli.WatchServices(ctx, ns, k8s.ManagedByTiltSelector())
ch, err := w.kCli.WatchServices(ctx, ns)
if err != nil {
err = errors.Wrapf(err, "Error watching services. Are you connected to kubernetes?\nTry running `kubectl get services -n %q`", ns)
st.Dispatch(store.NewErrorAction(err))
Expand Down
5 changes: 2 additions & 3 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -104,9 +103,9 @@ type Client interface {
//
// Over time, we want to remove the ability to watch all namespaces
// https://github.com/tilt-dev/tilt/issues/3792
WatchPods(ctx context.Context, ns Namespace, lps labels.Selector) (<-chan ObjectUpdate, error)
WatchPods(ctx context.Context, ns Namespace) (<-chan ObjectUpdate, error)

WatchServices(ctx context.Context, ns Namespace, lps labels.Selector) (<-chan *v1.Service, error)
WatchServices(ctx context.Context, ns Namespace) (<-chan *v1.Service, error)

WatchEvents(ctx context.Context, ns Namespace) (<-chan *v1.Event, error)

Expand Down
5 changes: 2 additions & 3 deletions internal/k8s/exploding_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/docker/distribution/reference"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

Expand Down Expand Up @@ -62,11 +61,11 @@ func (ec *explodingClient) CreatePortForwarder(ctx context.Context, namespace Na
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

func (ec *explodingClient) WatchPods(ctx context.Context, ns Namespace, lps labels.Selector) (<-chan ObjectUpdate, error) {
func (ec *explodingClient) WatchPods(ctx context.Context, ns Namespace) (<-chan ObjectUpdate, error) {
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

func (ec *explodingClient) WatchServices(ctx context.Context, ns Namespace, lps labels.Selector) (<-chan *v1.Service, error) {
func (ec *explodingClient) WatchServices(ctx context.Context, ns Namespace) (<-chan *v1.Service, error) {
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

Expand Down
42 changes: 9 additions & 33 deletions internal/k8s/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ type ExecCall struct {

type fakeServiceWatch struct {
ns Namespace
ls labels.Selector
ch chan *v1.Service
}

type fakePodWatch struct {
ns Namespace
ls labels.Selector
ch chan ObjectUpdate
}

Expand All @@ -106,22 +104,18 @@ func (c *FakeK8sClient) EmitService(ls labels.Selector, s *v1.Service) {
c.mu.Lock()
defer c.mu.Unlock()
for _, w := range c.serviceWatches {
if !SelectorEqual(ls, w.ls) {
continue
}

if w.ns != "" && w.ns != Namespace(s.Namespace) {
if w.ns != Namespace(s.Namespace) {
continue
}

w.ch <- s
}
}

func (c *FakeK8sClient) WatchServices(ctx context.Context, ns Namespace, ls labels.Selector) (<-chan *v1.Service, error) {
func (c *FakeK8sClient) WatchServices(ctx context.Context, ns Namespace) (<-chan *v1.Service, error) {
c.mu.Lock()
ch := make(chan *v1.Service, 20)
c.serviceWatches = append(c.serviceWatches, fakeServiceWatch{ns, ls, ch})
c.serviceWatches = append(c.serviceWatches, fakeServiceWatch{ns, ch})
c.mu.Unlock()

go func() {
Expand All @@ -130,7 +124,7 @@ func (c *FakeK8sClient) WatchServices(ctx context.Context, ns Namespace, ls labe
c.mu.Lock()
var newWatches []fakeServiceWatch
for _, e := range c.serviceWatches {
if e.ns != ns || !SelectorEqual(e.ls, ls) {
if e.ns != ns {
newWatches = append(newWatches, e)
}
}
Expand Down Expand Up @@ -184,25 +178,11 @@ func (c *FakeK8sClient) EmitEvent(ctx context.Context, evt *v1.Event) {
}
}

func (c *FakeK8sClient) WatchedSelectors() []labels.Selector {
c.mu.Lock()
defer c.mu.Unlock()
var ret []labels.Selector
for _, w := range c.podWatches {
ret = append(ret, w.ls)
}
return ret
}

func (c *FakeK8sClient) EmitPod(ls labels.Selector, p *v1.Pod) {
c.mu.Lock()
defer c.mu.Unlock()
for _, w := range c.podWatches {
if w.ns != "" && w.ns != Namespace(p.Namespace) {
continue
}

if !SelectorEqual(w.ls, ls) {
if w.ns != Namespace(p.Namespace) {
continue
}

Expand All @@ -214,22 +194,18 @@ func (c *FakeK8sClient) EmitPodDelete(ls labels.Selector, p *v1.Pod) {
c.mu.Lock()
defer c.mu.Unlock()
for _, w := range c.podWatches {
if w.ns != "" && w.ns != Namespace(p.Namespace) {
continue
}

if !SelectorEqual(w.ls, ls) {
if w.ns != Namespace(p.Namespace) {
continue
}

w.ch <- ObjectUpdate{obj: p, isDelete: true}
}
}

func (c *FakeK8sClient) WatchPods(ctx context.Context, ns Namespace, ls labels.Selector) (<-chan ObjectUpdate, error) {
func (c *FakeK8sClient) WatchPods(ctx context.Context, ns Namespace) (<-chan ObjectUpdate, error) {
c.mu.Lock()
ch := make(chan ObjectUpdate, 20)
c.podWatches = append(c.podWatches, fakePodWatch{ns, ls, ch})
c.podWatches = append(c.podWatches, fakePodWatch{ns, ch})
c.mu.Unlock()

go func() {
Expand All @@ -238,7 +214,7 @@ func (c *FakeK8sClient) WatchPods(ctx context.Context, ns Namespace, ls labels.S
c.mu.Lock()
var newWatches []fakePodWatch
for _, e := range c.podWatches {
if e.ns != ns || !SelectorEqual(e.ls, ls) {
if e.ns != ns {
newWatches = append(newWatches, e)
}
}
Expand Down
92 changes: 16 additions & 76 deletions internal/k8s/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ package k8s
import (
"context"
"fmt"
"net/http"
"time"

"github.com/blang/semver"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -72,48 +69,6 @@ func (r ObjectUpdate) AsDeletedKey() (Namespace, string, bool) {
return Namespace(ns), name, true
}

type watcherFactory func(namespace string) watcher
type watcher interface {
Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
}

func (kCli K8sClient) makeWatcher(ctx context.Context, f watcherFactory, ls labels.Selector) (watch.Interface, Namespace, error) {
// passing "" gets us all namespaces
w := f("")
if w == nil {
return nil, "", nil
}

watcher, err := w.Watch(ctx, metav1.ListOptions{LabelSelector: ls.String()})
if err == nil {
return watcher, "", nil
}

// If the request failed, we might be able to recover.
statusErr, isStatusErr := err.(*apiErrors.StatusError)
if !isStatusErr {
return nil, "", err
}

status := statusErr.ErrStatus
if status.Code == http.StatusForbidden {
// If this is a forbidden error, maybe the user just isn't allowed to watch this namespace.
// Let's narrow our request to just the config namespace, and see if that helps.
w := f(kCli.configNamespace.String())
if w == nil {
return nil, "", nil
}

watcher, err := w.Watch(ctx, metav1.ListOptions{LabelSelector: ls.String()})
if err == nil {
return watcher, kCli.configNamespace, nil
}

// ugh, it still failed. return the original error.
}
return nil, "", fmt.Errorf("%s, Reason: %s, Code: %d", status.Message, status.Reason, status.Code)
}

func maybeUnpackStatusError(err error) error {
statusErr, isStatusErr := err.(*apiErrors.StatusError)
if !isStatusErr {
Expand All @@ -126,38 +81,23 @@ func maybeUnpackStatusError(err error) error {
func (kCli K8sClient) makeInformer(
ctx context.Context,
ns Namespace,
gvr schema.GroupVersionResource,
ls labels.Selector) (cache.SharedInformer, error) {
gvr schema.GroupVersionResource) (cache.SharedInformer, error) {
if ns == "" {
return nil, fmt.Errorf("makeInformer no longer supports watching all namespaces")
}

// HACK(dmiller): There's no way to get errors out of an informer. See https://github.com/kubernetes/client-go/issues/155
// In the meantime, at least to get authorization and some other errors let's try to set up a watcher and then just
// throw it away.
if ns == "" {
watcher, narrowNS, err := kCli.makeWatcher(ctx, func(ns string) watcher {
return kCli.dynamic.Resource(gvr).Namespace(ns)
}, ls)
if err != nil {
return nil, errors.Wrap(err, "makeInformer")
}
watcher.Stop()
ns = narrowNS
} else {
watcher, err := kCli.dynamic.Resource(gvr).Namespace(ns.String()).
Watch(ctx, metav1.ListOptions{LabelSelector: ls.String()})
if err != nil {
return nil, errors.Wrap(maybeUnpackStatusError(err), "makeInformer")
}
watcher.Stop()
watcher, err := kCli.dynamic.Resource(gvr).Namespace(ns.String()).
Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(maybeUnpackStatusError(err), "makeInformer")
}
watcher.Stop()

options := []informers.SharedInformerOption{}
if !ls.Empty() {
options = append(options, informers.WithTweakListOptions(func(o *metav1.ListOptions) {
o.LabelSelector = ls.String()
}))
}
if ns != "" {
options = append(options, informers.WithNamespace(ns.String()))
options := []informers.SharedInformerOption{
informers.WithNamespace(ns.String()),
}

factory := informers.NewSharedInformerFactoryWithOptions(kCli.clientset, resyncPeriod, options...)
Expand All @@ -171,7 +111,7 @@ func (kCli K8sClient) makeInformer(

func (kCli K8sClient) WatchEvents(ctx context.Context, ns Namespace) (<-chan *v1.Event, error) {
gvr := EventGVR
informer, err := kCli.makeInformer(ctx, ns, gvr, labels.Everything())
informer, err := kCli.makeInformer(ctx, ns, gvr)
if err != nil {
return nil, errors.Wrap(err, "WatchEvents")
}
Expand Down Expand Up @@ -205,9 +145,9 @@ func (kCli K8sClient) WatchEvents(ctx context.Context, ns Namespace) (<-chan *v1
return ch, nil
}

func (kCli K8sClient) WatchPods(ctx context.Context, ns Namespace, ls labels.Selector) (<-chan ObjectUpdate, error) {
func (kCli K8sClient) WatchPods(ctx context.Context, ns Namespace) (<-chan ObjectUpdate, error) {
gvr := PodGVR
informer, err := kCli.makeInformer(ctx, ns, gvr, ls)
informer, err := kCli.makeInformer(ctx, ns, gvr)
if err != nil {
return nil, errors.Wrap(err, "WatchPods")
}
Expand Down Expand Up @@ -249,9 +189,9 @@ func (kCli K8sClient) WatchPods(ctx context.Context, ns Namespace, ls labels.Sel
return ch, nil
}

func (kCli K8sClient) WatchServices(ctx context.Context, ns Namespace, ls labels.Selector) (<-chan *v1.Service, error) {
func (kCli K8sClient) WatchServices(ctx context.Context, ns Namespace) (<-chan *v1.Service, error) {
gvr := ServiceGVR
informer, err := kCli.makeInformer(ctx, ns, gvr, ls)
informer, err := kCli.makeInformer(ctx, ns, gvr)
if err != nil {
return nil, errors.Wrap(err, "WatchServices")
}
Expand Down

0 comments on commit 5c71161

Please sign in to comment.