Skip to content

Commit

Permalink
[0.22] Tweak channel template (knative#1273)
Browse files Browse the repository at this point in the history
* Adding downstream checks for KafkaChannel v1a1

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>

* Adding the hack to the broker too

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>

* formatting

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
  • Loading branch information
matzew committed May 18, 2021
1 parent c9570cc commit bedf975
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func (r *Reconciler) getChannelTemplate(ctx context.Context, b *eventingv1.Broke
if template == nil {
return nil, errors.New("failed to find channelTemplate")
}

// hack for 0.21 downstream to prevent SRVKE-796
if template.Kind == "KafkaChannel" && template.APIVersion == "messaging.knative.dev/v1alpha1" {
template.APIVersion = "messaging.knative.dev/v1beta1"
}

ref.APIVersion = template.APIVersion
ref.Kind = template.Kind

Expand Down
5 changes: 5 additions & 0 deletions pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1.Channel) pkgreconc

track := r.channelableTracker.TrackInNamespaceKReference(ctx, c)

// hack for 0.21 downstream to prevent SRVKE-796
if c.Spec.ChannelTemplate.Kind == "KafkaChannel" && c.Spec.ChannelTemplate.APIVersion == "messaging.knative.dev/v1alpha1" {
c.Spec.ChannelTemplate.APIVersion = "messaging.knative.dev/v1beta1"
}

backingChannelObjRef := duckv1.KReference{
Kind: c.Spec.ChannelTemplate.Kind,
APIVersion: c.Spec.ChannelTemplate.APIVersion,
Expand Down
94 changes: 94 additions & 0 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,28 @@ func TestReconcile(t *testing.T) {
WithChannelNoAddress(),
WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")),
}},
}, {
Name: "Backing Kafka channel created",
Key: testKey,
Objects: []runtime.Object{
NewChannel(channelName, testNS,
WithChannelTemplate(channelKafkaCRD()),
WithInitChannelConditions,
WithBackingChannelObjRef(backingKafkaChannelObjRef()),
WithBackingChannelReady,
WithChannelAddress(backingChannelHostname)),
},
WantCreates: []runtime.Object{
createKafkaChannel(testNS, channelName, false),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
WithChannelTemplate(channelKafkaCRD()),
WithInitChannelConditions,
WithBackingChannelObjRef(backingKafkaChannelObjRef()),
WithChannelNoAddress(),
WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")),
}},
}, {
Name: "Backing channel created with delivery",
Key: testKey,
Expand Down Expand Up @@ -310,6 +332,13 @@ func channelCRD() metav1.TypeMeta {
}
}

func channelKafkaCRD() metav1.TypeMeta {
return metav1.TypeMeta{
APIVersion: "messaging.knative.dev/v1alpha1",
Kind: "KafkaChannel",
}
}

func subscribers() []eventingduckv1.SubscriberSpec {
return []eventingduckv1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Expand Down Expand Up @@ -378,6 +407,15 @@ func backingChannelObjRef() *duckv1.KReference {
}
}

func backingKafkaChannelObjRef() *duckv1.KReference {
return &duckv1.KReference{
APIVersion: "messaging.knative.dev/v1beta1",
Kind: "KafkaChannel",
Namespace: testNS,
Name: channelName,
}
}

func createChannel(namespace, name string, ready bool) *unstructured.Unstructured {
var hostname string
var url string
Expand Down Expand Up @@ -433,3 +471,59 @@ func createChannel(namespace, name string, ready bool) *unstructured.Unstructure
},
}
}

func createKafkaChannel(namespace, name string, ready bool) *unstructured.Unstructured {
var hostname string
var url string
if ready {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1alpha1",
"kind": "KafkaChannel",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": namespace,
"name": name,
"ownerReferences": []interface{}{
map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1",
"blockOwnerDeletion": true,
"controller": true,
"kind": "Channel",
"name": name,
"uid": "",
},
},
},
"status": map[string]interface{}{
"address": map[string]interface{}{
"hostname": hostname,
"url": url,
},
},
},
}
}

return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1beta1",
"kind": "KafkaChannel",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": namespace,
"name": name,
"ownerReferences": []interface{}{
map[string]interface{}{
"apiVersion": "messaging.knative.dev/v1",
"blockOwnerDeletion": true,
"controller": true,
"kind": "Channel",
"name": name,
"uid": "",
},
},
},
},
}
}
2 changes: 1 addition & 1 deletion test/e2e/channel_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions --->
*/
func TestChannelChainV1beta1(t *testing.T) {
t.Skip("upstream flake")
t.Skip("upstream flake")
helpers.ChannelChainTestHelper(context.Background(), t, helpers.SubscriptionV1beta1, channelTestRunner)
}

Expand Down

0 comments on commit bedf975

Please sign in to comment.