Skip to content

Commit

Permalink
Set DeadLetterSinkURI in Broker, Trigger and Channel status
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Oct 19, 2021
1 parent 58732ea commit 067a605
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 3 deletions.
10 changes: 10 additions & 0 deletions control-plane/pkg/core/config/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package config
import (
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/types"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
)
Expand Down Expand Up @@ -80,3 +82,11 @@ func KeyTypeFromString(s string) contract.KeyType {
return contract.KeyType_String
}
}

// SetDeadLetterSinkURIFromEgressConfig sets eventingduck.DeliveryStatus.DeadLetterSinkURI from a provided contract.EgressConfig.
func SetDeadLetterSinkURIFromEgressConfig(dStatus *eventingduck.DeliveryStatus, egressConfig *contract.EgressConfig) {
if egressConfig == nil {
return
}
dStatus.DeadLetterSinkURI, _ = apis.ParseURL(egressConfig.DeadLetter)
}
35 changes: 35 additions & 0 deletions control-plane/pkg/core/config/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

Expand Down Expand Up @@ -253,3 +255,36 @@ func TestKeyTypeFromString(t *testing.T) {
})
}
}

func TestSetDeadLetterSinkURIFromEgressConfig(t *testing.T) {
t.Parallel()

uri, _ := apis.ParseURL("https://localhost:433")
tests := []struct {
name string
got *eventingduck.DeliveryStatus
want *eventingduck.DeliveryStatus
egressConfig *contract.EgressConfig
}{
{
name: "nil egressConfig",
got: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
want: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
},
{
name: "nil DeadLetterSink",
got: &eventingduck.DeliveryStatus{DeadLetterSinkURI: nil},
want: &eventingduck.DeliveryStatus{DeadLetterSinkURI: uri},
egressConfig: &contract.EgressConfig{DeadLetter: uri.String()},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
SetDeadLetterSinkURIFromEgressConfig(tt.got, tt.egressConfig)
if diff := cmp.Diff(tt.want, tt.got); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
if err != nil {
return statusConditionManager.FailedToGetConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&broker.Status.DeliveryStatus, brokerResource.EgressConfig)

brokerIndex := coreconfig.FindResource(ct, broker.UID)
// Update contract data with the new contract configuration
Expand Down
11 changes: 11 additions & 0 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -267,6 +268,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURLFrom(BrokerNamespace, ServiceName)),
),
},
},
Expand Down Expand Up @@ -367,6 +369,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigMapUpdatedReady(&configs),
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -596,6 +599,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigParsed,
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved("http://www.my-sink.com/api"),
),
},
},
Expand Down Expand Up @@ -1212,6 +1216,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerConfigParsed,
BrokerTopicReady,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1303,6 +1308,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1367,6 +1373,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1428,6 +1435,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1492,6 +1500,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1539,6 +1548,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down Expand Up @@ -1589,6 +1599,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
BrokerDLSResolved(ServiceURL),
),
},
},
Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/retry"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
Expand All @@ -40,6 +37,10 @@ import (
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
kafkalogging "knative.dev/eventing-kafka-broker/control-plane/pkg/logging"
Expand Down Expand Up @@ -159,6 +160,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
if err != nil {
return statusConditionManager.FailedToGetConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&channel.Status.DeliveryStatus, channelResource.EgressConfig)

// Update contract data with the new contract configuration
channelIndex := coreconfig.FindResource(ct, channel.UID)
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func BrokerAddressable(configs *Configs) func(broker *eventing.Broker) {
}
}

func BrokerDLSResolved(uri string) func(broker *eventing.Broker) {
return func(broker *eventing.Broker) {
broker.Status.DeadLetterSinkURI, _ = apis.ParseURL(uri)
}
}

func BrokerFailedToCreateTopic(broker *eventing.Broker) {

broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(
Expand Down
2 changes: 2 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}
statusConditionManager.subscriberResolved(triggerConfig)

coreconfig.SetDeadLetterSinkURIFromEgressConfig(&trigger.Status.DeliveryStatus, triggerConfig.EgressConfig)

changed := coreconfig.AddOrUpdateEgressConfig(ct, brokerIndex, triggerConfig, triggerIndex)

coreconfig.IncrementContractGeneration(ct)
Expand Down
145 changes: 145 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,140 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
},
},
},
{
Name: "Reconciled normal - with Broker DLS",
Objects: []runtime.Object{
NewBroker(
BrokerReady,
WithDelivery(),
),
newTrigger(),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
},
},
}, &configs),
BrokerDispatcherPod(configs.SystemNamespace, nil),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
Egresses: []*contract.Egress{
{
Destination: ServiceURL,
ConsumerGroup: TriggerUUID,
Uid: TriggerUUID,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
},
},
},
},
Generation: 1,
}),
BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
}),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: newTrigger(
reconcilertesting.WithInitTriggerConditions,
reconcilertesting.WithTriggerSubscribed(),
withSubscriberURI,
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
withDeadLetterSinkURI(ServiceURL),
),
},
},
},
{
Name: "Reconciled normal - with Trigger DLS",
Objects: []runtime.Object{
NewBroker(
BrokerReady,
),
newTrigger(withDelivery),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
},
},
}, &configs),
BrokerDispatcherPod(configs.SystemNamespace, nil),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
Egresses: []*contract.Egress{
{
Destination: ServiceURL,
ConsumerGroup: TriggerUUID,
Uid: TriggerUUID,
EgressConfig: &contract.EgressConfig{
DeadLetter: url.String(),
Retry: 3,
BackoffPolicy: contract.BackoffPolicy_Exponential,
BackoffDelay: uint64(time.Second.Milliseconds()),
Timeout: uint64((time.Second * 2).Milliseconds()),
},
},
},
},
},
Generation: 1,
}),
BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
}),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: newTrigger(
withDelivery,
reconcilertesting.WithInitTriggerConditions,
reconcilertesting.WithTriggerSubscribed(),
withSubscriberURI,
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
withDeadLetterSinkURI(url.String()),
),
},
},
},
{
Name: "Reconciled normal - Trigger with ordered delivery",
Objects: []runtime.Object{
Expand Down Expand Up @@ -2201,6 +2335,17 @@ func withSubscriberURI(trigger *eventing.Trigger) {
trigger.Status.SubscriberURI = u
}

func withDeadLetterSinkURI(uri string) func(trigger *eventing.Trigger) {
return func(trigger *eventing.Trigger) {
u, err := apis.ParseURL(uri)
if err != nil {
panic(err)
}
trigger.Status.DeadLetterSinkURI = u
trigger.Status.MarkDeadLetterSinkResolvedSucceeded()
}
}

func withTriggerSubscriberResolvedSucceeded(deliveryOrder contract.DeliveryOrder) func(*eventing.Trigger) {
return func(t *eventing.Trigger) {
t.GetConditionSet().Manage(&t.Status).MarkTrueWithReason(
Expand Down

0 comments on commit 067a605

Please sign in to comment.