Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set DeadLetterSinkURI in the Broker and Trigger status #1349

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions control-plane/pkg/core/config/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package config
import (
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/types"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
)
Expand Down Expand Up @@ -80,3 +82,11 @@ func KeyTypeFromString(s string) contract.KeyType {
return contract.KeyType_String
}
}

// SetDeadLetterSinkURIFromEgressConfig sets eventingduck.DeliveryStatus.DeadLetterSinkURI from a provided contract.EgressConfig.
func SetDeadLetterSinkURIFromEgressConfig(dStatus *eventingduck.DeliveryStatus, egressConfig *contract.EgressConfig) {
if egressConfig == nil {
return
}
dStatus.DeadLetterSinkURI, _ = apis.ParseURL(egressConfig.DeadLetter)
}
35 changes: 35 additions & 0 deletions control-plane/pkg/core/config/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

Expand Down Expand Up @@ -253,3 +255,36 @@ func TestKeyTypeFromString(t *testing.T) {
})
}
}

func TestSetDeadLetterSinkURIFromEgressConfig(t *testing.T) {
t.Parallel()

uri, _ := apis.ParseURL("https://localhost:433")
tests := []struct {
name string
got *eventingduck.DeliveryStatus
want *eventingduck.DeliveryStatus
egressConfig *contract.EgressConfig
}{
{
name: "nil egressConfig",
got: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
want: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
},
{
name: "nil DeadLetterSink",
got: &eventingduck.DeliveryStatus{DeadLetterSinkURI: nil},
want: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
egressConfig: &contract.EgressConfig{DeadLetter: uri.String()},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
SetDeadLetterSinkURIFromEgressConfig(tt.got, tt.egressConfig)
if diff := cmp.Diff(tt.want, tt.got); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
if err != nil {
return statusConditionManager.FailedToGetConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&broker.Status.DeliveryStatus, brokerResource.EgressConfig)

brokerIndex := coreconfig.FindResource(ct, broker.UID)
// Update contract data with the new contract configuration
Expand Down
11 changes: 11 additions & 0 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -267,6 +268,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURLFrom(BrokerNamespace, ServiceName)),
),
},
},
Expand Down Expand Up @@ -367,6 +369,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigMapUpdatedReady(&configs),
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -596,6 +599,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigParsed,
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved("http://www.my-sink.com/api"),
),
},
},
Expand Down Expand Up @@ -1212,6 +1216,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigParsed,
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1303,6 +1308,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1367,6 +1373,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1428,6 +1435,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1492,6 +1500,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1539,6 +1548,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1589,6 +1599,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/retry"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
Expand All @@ -40,6 +37,10 @@ import (
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
kafkalogging "knative.dev/eventing-kafka-broker/control-plane/pkg/logging"
Expand Down Expand Up @@ -159,6 +160,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
if err != nil {
return statusConditionManager.FailedToGetConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&channel.Status.DeliveryStatus, channelResource.EgressConfig)

// Update contract data with the new contract configuration
channelIndex := coreconfig.FindResource(ct, channel.UID)
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func BrokerAddressable(configs *Configs) func(broker *eventing.Broker) {
}
}

func BrokerDLSResolved(uri string) func(broker *eventing.Broker) {
return func(broker *eventing.Broker) {
broker.Status.DeadLetterSinkURI, _ = apis.ParseURL(uri)
}
}

func BrokerFailedToCreateTopic(broker *eventing.Broker) {

broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(
Expand Down
2 changes: 2 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}
statusConditionManager.subscriberResolved(triggerConfig)

coreconfig.SetDeadLetterSinkURIFromEgressConfig(&trigger.Status.DeliveryStatus, triggerConfig.EgressConfig)

changed := coreconfig.AddOrUpdateEgressConfig(ct, brokerIndex, triggerConfig, triggerIndex)

coreconfig.IncrementContractGeneration(ct)
Expand Down
145 changes: 145 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,140 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
},
},
},
{
Name: "Reconciled normal - with Broker DLS",
Objects: []runtime.Object{
NewBroker(
BrokerReady,
WithDelivery(),
),
newTrigger(),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
},
},
}, &configs),
BrokerDispatcherPod(configs.SystemNamespace, nil),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
Egresses: []*contract.Egress{
{
Destination: ServiceURL,
ConsumerGroup: TriggerUUID,
Uid: TriggerUUID,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
},
},
},
},
Generation: 1,
}),
BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
}),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: newTrigger(
reconcilertesting.WithInitTriggerConditions,
reconcilertesting.WithTriggerSubscribed(),
withSubscriberURI,
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
withDeadLetterSinkURI(ServiceURL),
),
},
},
},
{
Name: "Reconciled normal - with Trigger DLS",
Objects: []runtime.Object{
NewBroker(
BrokerReady,
),
newTrigger(withDelivery),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
},
},
}, &configs),
BrokerDispatcherPod(configs.SystemNamespace, nil),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
Egresses: []*contract.Egress{
{
Destination: ServiceURL,
ConsumerGroup: TriggerUUID,
Uid: TriggerUUID,
EgressConfig: &contract.EgressConfig{
DeadLetter: url.String(),
Retry: 3,
BackoffPolicy: contract.BackoffPolicy_Exponential,
BackoffDelay: uint64(time.Second.Milliseconds()),
Timeout: uint64((time.Second * 2).Milliseconds()),
},
},
},
},
},
Generation: 1,
}),
BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
}),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: newTrigger(
withDelivery,
reconcilertesting.WithInitTriggerConditions,
reconcilertesting.WithTriggerSubscribed(),
withSubscriberURI,
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
withDeadLetterSinkURI(url.String()),
),
},
},
},
{
Name: "Reconciled normal - Trigger with ordered delivery",
Objects: []runtime.Object{
Expand Down Expand Up @@ -2201,6 +2335,17 @@ func withSubscriberURI(trigger *eventing.Trigger) {
trigger.Status.SubscriberURI = u
}

func withDeadLetterSinkURI(uri string) func(trigger *eventing.Trigger) {
return func(trigger *eventing.Trigger) {
u, err := apis.ParseURL(uri)
if err != nil {
panic(err)
}
trigger.Status.DeadLetterSinkURI = u
trigger.Status.MarkDeadLetterSinkResolvedSucceeded()
}
}

func withTriggerSubscriberResolvedSucceeded(deliveryOrder contract.DeliveryOrder) func(*eventing.Trigger) {
return func(t *eventing.Trigger) {
t.GetConditionSet().Manage(&t.Status).MarkTrueWithReason(
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
k8s.io/apiserver v0.21.4
k8s.io/client-go v0.21.4
k8s.io/utils v0.0.0-20210111153108-fddb29f9d009
knative.dev/eventing v0.26.1-0.20211014072442-a6a819dc71cf
knative.dev/eventing v0.26.1-0.20211018174236-a34aaa09f7d2
knative.dev/eventing-kafka v0.26.1-0.20211014113242-ebe7bfbe9c0d
knative.dev/hack v0.0.0-20210806075220-815cd312d65c
knative.dev/pkg v0.0.0-20211013152848-fa2f8f19557b
knative.dev/reconciler-test v0.0.0-20210930064245-45904ca4383d
knative.dev/hack v0.0.0-20211018110626-47ac3b032e60
knative.dev/pkg v0.0.0-20211018141937-a34efd6b409d
knative.dev/reconciler-test v0.0.0-20211018075026-816dbd1c57f7
)