Skip to content

Commit

Permalink
Prober targets service instead of pods directly (knative-extensions#2112
Browse files Browse the repository at this point in the history
)

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed May 10, 2022
1 parent bf22b5a commit ffaf43a
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 20 deletions.
30 changes: 14 additions & 16 deletions control-plane/pkg/prober/async_prober.go
Expand Up @@ -22,57 +22,55 @@ import (
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
"knative.dev/pkg/logging"
)

type IPsLister func() ([]string, error)

type asyncProber struct {
client httpClient
enqueue EnqueueFunc
logger *zap.Logger
cache Cache
podLister func() ([]*corev1.Pod, error)
IPsLister IPsLister
port string
}

// NewAsync creates an async Prober.
//
// It reports status changes using the provided EnqueueFunc.
func NewAsync(ctx context.Context, client httpClient, port string, podsLabelsSelector labels.Selector, enqueue EnqueueFunc) Prober {
func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) Prober {
logger := logging.FromContext(ctx).Desugar().
With(zap.String("scope", "prober"))

if len(port) == 0 {
logger.Fatal("Port is required")
}
podLister := podinformer.Get(ctx).Lister()
return &asyncProber{
client: client,
enqueue: enqueue,
logger: logger,
cache: NewLocalExpiringCache(ctx, 30*time.Minute),
podLister: func() ([]*corev1.Pod, error) { return podLister.List(podsLabelsSelector) },
IPsLister: IPsLister,
port: port,
}
}

func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expected Status) Status {
address := addressable.Address
pods, err := a.podLister()
IPs, err := a.IPsLister()
if err != nil {
a.logger.Error("Failed to list pods", zap.Error(err))
a.logger.Error("Failed to list IPs", zap.Error(err))
return StatusUnknown
}
// Return `StatusNotReady` when there are no pods.
if len(pods) == 0 {
// Return `StatusNotReady` when there are no IPs.
if len(IPs) == 0 {
return StatusNotReady
}

// aggregatedCurrentKnownStatus keeps track of the current status in the cache excluding `StatusReady`
// since we just skip pods that have `StatusReady`.
// since we just skip IPs that have `StatusReady`.
//
// If there is a status that is `StatusUnknown` the final status we want to return is `StatusUnknown`,
// while we return `StatusNotReady` when the status is known and all probes returned `StatusNotReady`.
Expand All @@ -83,18 +81,18 @@ func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expect
// It goes to done once we have all probe request results regardless of whether they are coming from
// the cache or from an actual request.
var wg sync.WaitGroup
wg.Add(len(pods))
wg.Add(len(IPs))

// enqueueOnce allows requeuing the resource only once, when we have all probe request results.
var enqueueOnce sync.Once

for _, p := range pods {
for _, IP := range IPs {
podUrl := *address
podUrl.Host = p.Status.PodIP + ":" + a.port
podUrl.Host = IP + ":" + a.port
address := podUrl.String()

logger := a.logger.
With(zap.String("pod.metadata.name", p.Name)).
With(zap.String("IP", IP)).
With(zap.String("address", address))

currentStatus := a.cache.GetStatus(address)
Expand Down
13 changes: 12 additions & 1 deletion control-plane/pkg/prober/async_prober_test.go
Expand Up @@ -132,7 +132,18 @@ func TestAsyncProber(t *testing.T) {
u, _ := url.Parse(s.URL)

wantRequeueCountMin := atomic.NewInt64(int64(tc.wantRequeueCountMin))
prober := NewAsync(ctx, s.Client(), u.Port(), tc.podsLabelsSelector, func(key types.NamespacedName) {
var IPsLister IPsLister = func() ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector)
if err != nil {
return nil, err
}
ips := make([]string, 0, len(pods))
for _, p := range pods {
ips = append(ips, p.Status.PodIP)
}
return ips, nil
}
prober := NewAsync(ctx, s.Client(), u.Port(), IPsLister, func(key types.NamespacedName) {
wantRequeueCountMin.Dec()
})

Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/prober/prober.go
Expand Up @@ -18,6 +18,7 @@ package prober

import (
"context"
"fmt"
"net/http"
"net/url"

Expand Down Expand Up @@ -89,3 +90,9 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s

return StatusReady
}

func IPsListerFromService(svc types.NamespacedName) IPsLister {
return func() ([]string, error) {
return []string{fmt.Sprintf("%s.%s.svc", svc.Name, svc.Namespace)}, nil
}
}
32 changes: 32 additions & 0 deletions control-plane/pkg/prober/prober_test.go
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/types"
)

func TestFuncProbe(t *testing.T) {
Expand All @@ -38,3 +40,33 @@ func TestFuncProbe(t *testing.T) {
require.Equal(t, status, s, s.String())
require.Equal(t, int32(1), calls.Load())
}

func TestIPsListerFromService(t *testing.T) {
tests := []struct {
name string
svc types.NamespacedName
want []string
wantErr bool
}{
{
name: "ok",
svc: types.NamespacedName{
Namespace: "ns",
Name: "name",
},
want: []string{"name.ns.svc"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := IPsListerFromService(tt.svc)()
if tt.wantErr != (err != nil) {
t.Errorf("Got err %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/broker/controller.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand Down Expand Up @@ -88,7 +89,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
})

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: env.SystemNamespace, Name: env.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(env.IngressName, env.SystemNamespace)

brokerInformer := brokerinformer.Get(ctx)
Expand Down
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/channel/controller.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

messagingv1beta "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -86,7 +87,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)

channelInformer := kafkachannelinformer.Get(ctx)
Expand Down
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/sink/controller.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand Down Expand Up @@ -75,7 +76,8 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env
}

impl := sinkreconciler.NewImpl(ctx, reconciler)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)

sinkInformer := sinkinformer.Get(ctx)
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/broker/500-receiver.yaml
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/channel/500-receiver.yaml
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/sink/500-receiver.yaml
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down

0 comments on commit ffaf43a

Please sign in to comment.