From 52576e0b5e8687d204fb8e0bf9e02930da7214c5 Mon Sep 17 00:00:00 2001 From: OpenShift Cherrypick Robot Date: Tue, 10 May 2022 10:40:06 -0700 Subject: [PATCH] [release-v1.3] Prober targets service instead of pods directly (#2112) (#257) * Prober targets service instead of pods directly (#2112) Signed-off-by: Pierangelo Di Pilato * Run make generate-release Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato --- control-plane/pkg/prober/async_prober.go | 30 ++++++++--------- control-plane/pkg/prober/async_prober_test.go | 13 +++++++- control-plane/pkg/prober/prober.go | 7 ++++ control-plane/pkg/prober/prober_test.go | 32 +++++++++++++++++++ .../pkg/reconciler/broker/controller.go | 4 ++- .../pkg/reconciler/channel/controller.go | 4 ++- .../pkg/reconciler/sink/controller.go | 4 ++- data-plane/config/broker/500-receiver.yaml | 4 +++ data-plane/config/channel/500-receiver.yaml | 4 +++ data-plane/config/sink/500-receiver.yaml | 4 +++ .../artifacts/eventing-kafka-broker.yaml | 4 +++ .../artifacts/eventing-kafka-channel.yaml | 4 +++ .../artifacts/eventing-kafka-sink.yaml | 4 +++ 13 files changed, 98 insertions(+), 20 deletions(-) diff --git a/control-plane/pkg/prober/async_prober.go b/control-plane/pkg/prober/async_prober.go index 1d6d2ed6d4..c4f211c09e 100644 --- a/control-plane/pkg/prober/async_prober.go +++ b/control-plane/pkg/prober/async_prober.go @@ -22,57 +22,55 @@ import ( "time" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" "knative.dev/pkg/logging" ) +type IPsLister func() ([]string, error) + type asyncProber struct { client httpClient enqueue EnqueueFunc logger *zap.Logger cache Cache - podLister func() ([]*corev1.Pod, error) + IPsLister IPsLister port string } // NewAsync creates an async Prober. // // It reports status changes using the provided EnqueueFunc. -func NewAsync(ctx context.Context, client httpClient, port string, podsLabelsSelector labels.Selector, enqueue EnqueueFunc) Prober { +func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) Prober { logger := logging.FromContext(ctx).Desugar(). With(zap.String("scope", "prober")) if len(port) == 0 { logger.Fatal("Port is required") } - podLister := podinformer.Get(ctx).Lister() return &asyncProber{ client: client, enqueue: enqueue, logger: logger, cache: NewLocalExpiringCache(ctx, 30*time.Minute), - podLister: func() ([]*corev1.Pod, error) { return podLister.List(podsLabelsSelector) }, + IPsLister: IPsLister, port: port, } } func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expected Status) Status { address := addressable.Address - pods, err := a.podLister() + IPs, err := a.IPsLister() if err != nil { - a.logger.Error("Failed to list pods", zap.Error(err)) + a.logger.Error("Failed to list IPs", zap.Error(err)) return StatusUnknown } - // Return `StatusNotReady` when there are no pods. - if len(pods) == 0 { + // Return `StatusNotReady` when there are no IPs. + if len(IPs) == 0 { return StatusNotReady } // aggregatedCurrentKnownStatus keeps track of the current status in the cache excluding `StatusReady` - // since we just skip pods that have `StatusReady`. + // since we just skip IPs that have `StatusReady`. // // If there is a status that is `StatusUnknown` the final status we want to return is `StatusUnknown`, // while we return `StatusNotReady` when the status is known and all probes returned `StatusNotReady`. @@ -83,18 +81,18 @@ func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expect // It goes to done once we have all probe request results regardless of whether they are coming from // the cache or from an actual request. var wg sync.WaitGroup - wg.Add(len(pods)) + wg.Add(len(IPs)) // enqueueOnce allows requeuing the resource only once, when we have all probe request results. var enqueueOnce sync.Once - for _, p := range pods { + for _, IP := range IPs { podUrl := *address - podUrl.Host = p.Status.PodIP + ":" + a.port + podUrl.Host = IP + ":" + a.port address := podUrl.String() logger := a.logger. - With(zap.String("pod.metadata.name", p.Name)). + With(zap.String("IP", IP)). With(zap.String("address", address)) currentStatus := a.cache.GetStatus(address) diff --git a/control-plane/pkg/prober/async_prober_test.go b/control-plane/pkg/prober/async_prober_test.go index d68fe44816..893a3b6c8a 100644 --- a/control-plane/pkg/prober/async_prober_test.go +++ b/control-plane/pkg/prober/async_prober_test.go @@ -132,7 +132,18 @@ func TestAsyncProber(t *testing.T) { u, _ := url.Parse(s.URL) wantRequeueCountMin := atomic.NewInt64(int64(tc.wantRequeueCountMin)) - prober := NewAsync(ctx, s.Client(), u.Port(), tc.podsLabelsSelector, func(key types.NamespacedName) { + var IPsLister IPsLister = func() ([]string, error) { + pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector) + if err != nil { + return nil, err + } + ips := make([]string, 0, len(pods)) + for _, p := range pods { + ips = append(ips, p.Status.PodIP) + } + return ips, nil + } + prober := NewAsync(ctx, s.Client(), u.Port(), IPsLister, func(key types.NamespacedName) { wantRequeueCountMin.Dec() }) diff --git a/control-plane/pkg/prober/prober.go b/control-plane/pkg/prober/prober.go index 857693d9c3..b0cf2f7d87 100644 --- a/control-plane/pkg/prober/prober.go +++ b/control-plane/pkg/prober/prober.go @@ -18,6 +18,7 @@ package prober import ( "context" + "fmt" "net/http" "net/url" @@ -89,3 +90,9 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s return StatusReady } + +func IPsListerFromService(svc types.NamespacedName) IPsLister { + return func() ([]string, error) { + return []string{fmt.Sprintf("%s.%s.svc", svc.Name, svc.Namespace)}, nil + } +} diff --git a/control-plane/pkg/prober/prober_test.go b/control-plane/pkg/prober/prober_test.go index bb9bdf30bf..596c8985d8 100644 --- a/control-plane/pkg/prober/prober_test.go +++ b/control-plane/pkg/prober/prober_test.go @@ -20,8 +20,10 @@ import ( "context" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "k8s.io/apimachinery/pkg/types" ) func TestFuncProbe(t *testing.T) { @@ -38,3 +40,33 @@ func TestFuncProbe(t *testing.T) { require.Equal(t, status, s, s.String()) require.Equal(t, int32(1), calls.Load()) } + +func TestIPsListerFromService(t *testing.T) { + tests := []struct { + name string + svc types.NamespacedName + want []string + wantErr bool + }{ + { + name: "ok", + svc: types.NamespacedName{ + Namespace: "ns", + Name: "name", + }, + want: []string{"name.ns.svc"}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := IPsListerFromService(tt.svc)() + if tt.wantErr != (err != nil) { + t.Errorf("Got err %v, wantErr %v", err, tt.wantErr) + } + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Error("(-want, +got)", diff) + } + }) + } +} diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 22613cc7dc..dd614d1eb5 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -24,6 +24,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" eventing "knative.dev/eventing/pkg/apis/eventing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -88,7 +89,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E }) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey) + IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: env.SystemNamespace, Name: env.IngressName}) + reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(env.IngressName, env.SystemNamespace) brokerInformer := brokerinformer.Get(ctx) diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 95b327420f..04698e9ecb 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" messagingv1beta "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" @@ -86,7 +87,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf } impl := kafkachannelreconciler.NewImpl(ctx, reconciler) - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey) + IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName}) + reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) channelInformer := kafkachannelinformer.Get(ctx) diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index 967dcf4b68..b0de028396 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" @@ -75,7 +76,8 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env } impl := sinkreconciler.NewImpl(ctx, reconciler) - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey) + IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName}) + reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) sinkInformer := sinkinformer.Get(ctx) diff --git a/data-plane/config/broker/500-receiver.yaml b/data-plane/config/broker/500-receiver.yaml index 5b3e01eed2..50be743021 100644 --- a/data-plane/config/broker/500-receiver.yaml +++ b/data-plane/config/broker/500-receiver.yaml @@ -181,6 +181,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP diff --git a/data-plane/config/channel/500-receiver.yaml b/data-plane/config/channel/500-receiver.yaml index a2c643c0e4..6afd14ece6 100644 --- a/data-plane/config/channel/500-receiver.yaml +++ b/data-plane/config/channel/500-receiver.yaml @@ -181,6 +181,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP diff --git a/data-plane/config/sink/500-receiver.yaml b/data-plane/config/sink/500-receiver.yaml index af91980a7f..a0cbf2124e 100644 --- a/data-plane/config/sink/500-receiver.yaml +++ b/data-plane/config/sink/500-receiver.yaml @@ -181,6 +181,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP diff --git a/openshift/release/artifacts/eventing-kafka-broker.yaml b/openshift/release/artifacts/eventing-kafka-broker.yaml index 366bec5e37..6b467c6729 100644 --- a/openshift/release/artifacts/eventing-kafka-broker.yaml +++ b/openshift/release/artifacts/eventing-kafka-broker.yaml @@ -556,6 +556,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP diff --git a/openshift/release/artifacts/eventing-kafka-channel.yaml b/openshift/release/artifacts/eventing-kafka-channel.yaml index e45dbcbd13..114f237dad 100644 --- a/openshift/release/artifacts/eventing-kafka-channel.yaml +++ b/openshift/release/artifacts/eventing-kafka-channel.yaml @@ -555,6 +555,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP diff --git a/openshift/release/artifacts/eventing-kafka-sink.yaml b/openshift/release/artifacts/eventing-kafka-sink.yaml index ca1aa99c41..490e80e1a5 100644 --- a/openshift/release/artifacts/eventing-kafka-sink.yaml +++ b/openshift/release/artifacts/eventing-kafka-sink.yaml @@ -325,6 +325,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-container + port: 8080 + protocol: TCP + targetPort: 8080 - name: http-metrics port: 9090 protocol: TCP