diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index c4a92eb7388..6ed35bee0e8 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -211,6 +211,12 @@ func (r *Reconciler) getChannelTemplate(ctx context.Context, b *eventingv1.Broke if template == nil { return nil, errors.New("failed to find channelTemplate") } + + // hack for 0.21 downstream to prevent SRVKE-796 + if template.Kind == "KafkaChannel" && template.APIVersion == "messaging.knative.dev/v1alpha1" { + template.APIVersion = "messaging.knative.dev/v1beta1" + } + ref.APIVersion = template.APIVersion ref.Kind = template.Kind diff --git a/pkg/reconciler/channel/channel.go b/pkg/reconciler/channel/channel.go index c0107743592..191428cdb2b 100644 --- a/pkg/reconciler/channel/channel.go +++ b/pkg/reconciler/channel/channel.go @@ -65,6 +65,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1.Channel) pkgreconc track := r.channelableTracker.TrackInNamespaceKReference(ctx, c) + // hack for 0.21 downstream to prevent SRVKE-796 + if c.Spec.ChannelTemplate.Kind == "KafkaChannel" && c.Spec.ChannelTemplate.APIVersion == "messaging.knative.dev/v1alpha1" { + c.Spec.ChannelTemplate.APIVersion = "messaging.knative.dev/v1beta1" + } + backingChannelObjRef := duckv1.KReference{ Kind: c.Spec.ChannelTemplate.Kind, APIVersion: c.Spec.ChannelTemplate.APIVersion, diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index d131ffbbbfc..5bcfdaa291e 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -174,6 +174,28 @@ func TestReconcile(t *testing.T) { WithChannelNoAddress(), WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), }}, + }, { + Name: "Backing Kafka channel created", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelKafkaCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingKafkaChannelObjRef()), + WithBackingChannelReady, + WithChannelAddress(backingChannelHostname)), + }, + WantCreates: []runtime.Object{ + createKafkaChannel(testNS, channelName, false), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelKafkaCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingKafkaChannelObjRef()), + WithChannelNoAddress(), + WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), + }}, }, { Name: "Backing channel created with delivery", Key: testKey, @@ -310,6 +332,13 @@ func channelCRD() metav1.TypeMeta { } } +func channelKafkaCRD() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "KafkaChannel", + } +} + func subscribers() []eventingduckv1.SubscriberSpec { return []eventingduckv1.SubscriberSpec{{ UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", @@ -378,6 +407,15 @@ func backingChannelObjRef() *duckv1.KReference { } } +func backingKafkaChannelObjRef() *duckv1.KReference { + return &duckv1.KReference{ + APIVersion: "messaging.knative.dev/v1beta1", + Kind: "KafkaChannel", + Namespace: testNS, + Name: channelName, + } +} + func createChannel(namespace, name string, ready bool) *unstructured.Unstructured { var hostname string var url string @@ -433,3 +471,59 @@ func createChannel(namespace, name string, ready bool) *unstructured.Unstructure }, } } + +func createKafkaChannel(namespace, name string, ready bool) *unstructured.Unstructured { + var hostname string + var url string + if ready { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "kind": "KafkaChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Channel", + "name": name, + "uid": "", + }, + }, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": hostname, + "url": url, + }, + }, + }, + } + } + + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1beta1", + "kind": "KafkaChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Channel", + "name": name, + "uid": "", + }, + }, + }, + }, + } +} diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 00c82b74b38..cd12b42bb26 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -32,7 +32,7 @@ EventSource ---> Channel ---> Subscriptions ---> Channel ---> Subscriptions ---> */ func TestChannelChainV1beta1(t *testing.T) { - t.Skip("upstream flake") + t.Skip("upstream flake") helpers.ChannelChainTestHelper(context.Background(), t, helpers.SubscriptionV1beta1, channelTestRunner) }