Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prober targets service instead of pods directly #2112

Merged
merged 1 commit into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions control-plane/pkg/prober/async_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,55 @@ import (
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
"knative.dev/pkg/logging"
)

type IPsLister func() ([]string, error)

type asyncProber struct {
client httpClient
enqueue EnqueueFunc
logger *zap.Logger
cache Cache
podLister func() ([]*corev1.Pod, error)
IPsLister IPsLister
port string
}

// NewAsync creates an async Prober.
//
// It reports status changes using the provided EnqueueFunc.
func NewAsync(ctx context.Context, client httpClient, port string, podsLabelsSelector labels.Selector, enqueue EnqueueFunc) Prober {
func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) Prober {
logger := logging.FromContext(ctx).Desugar().
With(zap.String("scope", "prober"))

if len(port) == 0 {
logger.Fatal("Port is required")
}
podLister := podinformer.Get(ctx).Lister()
return &asyncProber{
client: client,
enqueue: enqueue,
logger: logger,
cache: NewLocalExpiringCache(ctx, 30*time.Minute),
podLister: func() ([]*corev1.Pod, error) { return podLister.List(podsLabelsSelector) },
IPsLister: IPsLister,
port: port,
}
}

func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expected Status) Status {
address := addressable.Address
pods, err := a.podLister()
IPs, err := a.IPsLister()
if err != nil {
a.logger.Error("Failed to list pods", zap.Error(err))
a.logger.Error("Failed to list IPs", zap.Error(err))
return StatusUnknown
}
// Return `StatusNotReady` when there are no pods.
if len(pods) == 0 {
// Return `StatusNotReady` when there are no IPs.
if len(IPs) == 0 {
return StatusNotReady
}

// aggregatedCurrentKnownStatus keeps track of the current status in the cache excluding `StatusReady`
// since we just skip pods that have `StatusReady`.
// since we just skip IPs that have `StatusReady`.
//
// If there is a status that is `StatusUnknown` the final status we want to return is `StatusUnknown`,
// while we return `StatusNotReady` when the status is known and all probes returned `StatusNotReady`.
Expand All @@ -83,18 +81,18 @@ func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expect
// It goes to done once we have all probe request results regardless of whether they are coming from
// the cache or from an actual request.
var wg sync.WaitGroup
wg.Add(len(pods))
wg.Add(len(IPs))

// enqueueOnce allows requeuing the resource only once, when we have all probe request results.
var enqueueOnce sync.Once

for _, p := range pods {
for _, IP := range IPs {
podUrl := *address
podUrl.Host = p.Status.PodIP + ":" + a.port
podUrl.Host = IP + ":" + a.port
address := podUrl.String()

logger := a.logger.
With(zap.String("pod.metadata.name", p.Name)).
With(zap.String("IP", IP)).
With(zap.String("address", address))

currentStatus := a.cache.GetStatus(address)
Expand Down
13 changes: 12 additions & 1 deletion control-plane/pkg/prober/async_prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,18 @@ func TestAsyncProber(t *testing.T) {
u, _ := url.Parse(s.URL)

wantRequeueCountMin := atomic.NewInt64(int64(tc.wantRequeueCountMin))
prober := NewAsync(ctx, s.Client(), u.Port(), tc.podsLabelsSelector, func(key types.NamespacedName) {
var IPsLister IPsLister = func() ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector)
if err != nil {
return nil, err
}
ips := make([]string, 0, len(pods))
for _, p := range pods {
ips = append(ips, p.Status.PodIP)
}
return ips, nil
}
prober := NewAsync(ctx, s.Client(), u.Port(), IPsLister, func(key types.NamespacedName) {
wantRequeueCountMin.Dec()
})

Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package prober

import (
"context"
"fmt"
"net/http"
"net/url"

Expand Down Expand Up @@ -89,3 +90,9 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s

return StatusReady
}

func IPsListerFromService(svc types.NamespacedName) IPsLister {
return func() ([]string, error) {
return []string{fmt.Sprintf("%s.%s.svc", svc.Name, svc.Namespace)}, nil
}
}
32 changes: 32 additions & 0 deletions control-plane/pkg/prober/prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/types"
)

func TestFuncProbe(t *testing.T) {
Expand All @@ -38,3 +40,33 @@ func TestFuncProbe(t *testing.T) {
require.Equal(t, status, s, s.String())
require.Equal(t, int32(1), calls.Load())
}

func TestIPsListerFromService(t *testing.T) {
tests := []struct {
name string
svc types.NamespacedName
want []string
wantErr bool
}{
{
name: "ok",
svc: types.NamespacedName{
Namespace: "ns",
Name: "name",
},
want: []string{"name.ns.svc"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := IPsListerFromService(tt.svc)()
if tt.wantErr != (err != nil) {
t.Errorf("Got err %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand Down Expand Up @@ -88,7 +89,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
})

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: env.SystemNamespace, Name: env.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, env.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(env.IngressName, env.SystemNamespace)

brokerInformer := brokerinformer.Get(ctx)
Expand Down
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

messagingv1beta "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -84,7 +85,8 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)

channelInformer := kafkachannelinformer.Get(ctx)
Expand Down
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/resolver"

Expand Down Expand Up @@ -73,7 +74,8 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

Expand Down
4 changes: 3 additions & 1 deletion control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand Down Expand Up @@ -75,7 +76,8 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env
}

impl := sinkreconciler.NewImpl(ctx, reconciler)
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, reconciler.ReceiverSelector(), impl.EnqueueKey)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)

sinkInformer := sinkinformer.Get(ctx)
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/broker/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/channel/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/sink/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-container
port: 8080
protocol: TCP
targetPort: 8080
- name: http-metrics
port: 9090
protocol: TCP
Expand Down