Skip to content

Commit

Permalink
fixed merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Oct 18, 2023
2 parents 35bc707 + 794302d commit afaffb0
Show file tree
Hide file tree
Showing 225 changed files with 13,756 additions and 2,772 deletions.
2 changes: 2 additions & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ aliases:
- matzew
- pierDipi
eventing-kafka-broker-reviewers:
- Leo6Leo
- cali0707
eventing-kafka-mtsource-approvers:
- steven0711dong
Expand Down Expand Up @@ -83,6 +84,7 @@ aliases:
- lionelvillard
- matzew
eventing-reviewers:
- Leo6Leo
- cali0707
- creydr
eventing-wg-leads:
Expand Down
16 changes: 9 additions & 7 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -346,9 +346,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()
proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -429,9 +428,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -157,7 +157,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -2048,7 +2048,7 @@ func TestFinalizeKind(t *testing.T) {
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
}
Expand All @@ -2058,9 +2058,9 @@ func TestFinalizeKind(t *testing.T) {

func useTable(t *testing.T, table TableTest, env config.Env) {
table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
26 changes: 23 additions & 3 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package channel

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/apis/feature"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"

messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -89,10 +89,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

channelInformer := kafkachannelinformer.Get(ctx)

Expand All @@ -115,6 +125,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
Handler: controller.HandleAll(globalResync),
})

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
globalResync(obj)
}

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName),
Handler: cache.ResourceEventHandlerFuncs{
Expand All @@ -128,7 +148,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(globalResync),
Handler: controller.HandleAll(rotateCACerts),
})

configmapinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(
Expand Down
17 changes: 9 additions & 8 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -313,10 +313,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()

proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -420,9 +418,12 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/v2/channelv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -152,7 +152,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestReconcileKind(t *testing.T) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -1907,9 +1907,9 @@ func TestReconcileKind(t *testing.T) {
}

table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
24 changes: 21 additions & 3 deletions control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package v2

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -90,6 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
Expand All @@ -99,15 +105,27 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
kafkaConfigStore.WatchConfigs(watcher)

IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)
}
reconciler.Tracker = impl.Tracker
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)),
Handler: controller.HandleAll(rotateCACerts),
})

reconciler.Tracker = impl.Tracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ spec:
app.kubernetes.io/component: kafka-broker-receiver
app.kubernetes.io/name: knative-eventing

duration: 2160h # 90d
renewBefore: 360h # 15d
# Use 0m0s so that we don't run into https://github.com/cert-manager/cert-manager/issues/6408 on the operator
duration: 2160h0m0s # 90d
renewBefore: 360h0m0s # 15d
subject:
organizations:
- local
isCA: false
privateKey:
algorithm: RSA
encoding: PKCS1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ spec:
app.kubernetes.io/component: kafka-channel-receiver
app.kubernetes.io/name: knative-eventing

duration: 2160h # 90d
renewBefore: 360h # 15d
# Use 0m0s so that we don't run into https://github.com/cert-manager/cert-manager/issues/6408 on the operator
duration: 2160h0m0s # 90d
renewBefore: 360h0m0s # 15d
subject:
organizations:
- local
isCA: false
privateKey:
algorithm: RSA
encoding: PKCS1
Expand Down
6 changes: 3 additions & 3 deletions data-plane/config/sink-tls/sink-ingress-tls-certificate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ spec:
app.kubernetes.io/component: kafka-sink-receiver
app.kubernetes.io/name: knative-eventing

duration: 2160h # 90d
renewBefore: 360h # 15d
# Use 0m0s so that we don't run into https://github.com/cert-manager/cert-manager/issues/6408 on the operator
duration: 2160h0m0s # 90d
renewBefore: 360h0m0s # 15d
subject:
organizations:
- local
isCA: false
privateKey:
algorithm: RSA
encoding: PKCS1
Expand Down
2 changes: 1 addition & 1 deletion data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<kafka.version>3.2.3</kafka.version>
<debezium.version>1.9.6.Final</debezium.version>
<jib.version>3.3.2</jib.version>
<quarkus.version>3.2.0.Final</quarkus.version>
<quarkus.version>3.2.6.Final</quarkus.version>
<antlr.version>4.9.2
</antlr.version> <!-- Overwritting quarkus's antlr version. Reminder: antlr4-maven-plugin,antlr4-runtime, antlr4 need to have the same version -->
<palantirJavaFormat.version>2.33.0</palantirJavaFormat.version>
Expand Down
Loading

0 comments on commit afaffb0

Please sign in to comment.