diff --git a/pkg/apis/eventing/v1/broker_lifecycle.go b/pkg/apis/eventing/v1/broker_lifecycle.go index ebd71cde536..0dc966323b4 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1/broker_lifecycle.go @@ -17,20 +17,11 @@ limitations under the License. package v1 import ( - corev1 "k8s.io/api/core/v1" + "sync" - "knative.dev/eventing/pkg/apis/duck" - duckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/pkg/apis" ) -var brokerCondSet = apis.NewLivingConditionSet( - BrokerConditionIngress, - BrokerConditionTriggerChannel, - BrokerConditionFilter, - BrokerConditionAddressable, -) - const ( BrokerConditionReady = apis.ConditionReady BrokerConditionIngress apis.ConditionType = "IngressReady" @@ -39,14 +30,41 @@ const ( BrokerConditionAddressable apis.ConditionType = "Addressable" ) +var brokerCondSet = apis.NewLivingConditionSet( + BrokerConditionIngress, + BrokerConditionTriggerChannel, + BrokerConditionFilter, + BrokerConditionAddressable, +) +var brokerCondSetLock = sync.RWMutex{} + +// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. +func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) { + brokerCondSetLock.Lock() + defer brokerCondSetLock.Unlock() + + brokerCondSet = conditionSet +} + // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. -func (*Broker) GetConditionSet() apis.ConditionSet { +func (b *Broker) GetConditionSet() apis.ConditionSet { + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + + return brokerCondSet +} + +// GetConditionSet retrieves the condition set for this resource. +func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + return brokerCondSet } // GetTopLevelCondition returns the top level Condition. func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { - return brokerCondSet.Manage(bs).GetTopLevelCondition() + return bs.GetConditionSet().Manage(bs).GetTopLevelCondition() } // SetAddress makes this Broker addressable by setting the URI. It also @@ -54,61 +72,23 @@ func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { func (bs *BrokerStatus) SetAddress(url *apis.URL) { bs.Address.URL = url if url != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable) } else { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") } } // GetCondition returns the condition currently associated with the given type, or nil. func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition { - return brokerCondSet.Manage(bs).GetCondition(t) + return bs.GetConditionSet().Manage(bs).GetCondition(t) } // IsReady returns true if the resource is ready overall. func (bs *BrokerStatus) IsReady() bool { - return brokerCondSet.Manage(bs).IsHappy() + return bs.GetConditionSet().Manage(bs).IsHappy() } // InitializeConditions sets relevant unset conditions to Unknown state. func (bs *BrokerStatus) InitializeConditions() { - brokerCondSet.Manage(bs).InitializeConditions() -} - -func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) - } else { - bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} - -func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) { - // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... - address := cs.AddressStatus.Address - if address != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) - } else { - bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") - } -} - -func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) - } else { - bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } + bs.GetConditionSet().Manage(bs).InitializeConditions() } diff --git a/pkg/apis/eventing/v1/broker_lifecycle_mt.go b/pkg/apis/eventing/v1/broker_lifecycle_mt.go new file mode 100644 index 00000000000..ce1cf4fa4b0 --- /dev/null +++ b/pkg/apis/eventing/v1/broker_lifecycle_mt.go @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + + "knative.dev/eventing/pkg/apis/duck" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" +) + +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} + +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1.ChannelableStatus) { + // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... + address := cs.AddressStatus.Address + if address != nil { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") + } +} + +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} diff --git a/pkg/apis/eventing/v1/broker_lifecycle_test.go b/pkg/apis/eventing/v1/broker_lifecycle_test.go index 5c6fd43c112..5b3e87cc554 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_test.go @@ -18,8 +18,10 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -57,6 +59,54 @@ var ( } ) +func TestBrokerGetConditionSet(t *testing.T) { + + customCondition := apis.NewLivingConditionSet( + apis.ConditionReady, + "ConditionGolangReady", + ) + brokerClass := "Golang" + + tt := []struct { + name string + broker Broker + expectedConditionSet apis.ConditionSet + }{ + { + name: "default condition set", + broker: Broker{}, + expectedConditionSet: brokerCondSet, + }, + { + name: "custom condition set", + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + expectedConditionSet: customCondition, + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + defer RegisterAlternateBrokerConditionSet(brokerCondSet) // reset to default condition set + + RegisterAlternateBrokerConditionSet(tc.expectedConditionSet) + + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + }) + } +} + func TestBrokerGetCondition(t *testing.T) { tests := []struct { name string diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle.go b/pkg/apis/eventing/v1beta1/broker_lifecycle.go index 6095195a125..b4169bccab9 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle.go @@ -17,20 +17,11 @@ limitations under the License. package v1beta1 import ( - corev1 "k8s.io/api/core/v1" + "sync" - "knative.dev/eventing/pkg/apis/duck" - duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" ) -var brokerCondSet = apis.NewLivingConditionSet( - BrokerConditionIngress, - BrokerConditionTriggerChannel, - BrokerConditionFilter, - BrokerConditionAddressable, -) - const ( BrokerConditionReady = apis.ConditionReady BrokerConditionIngress apis.ConditionType = "IngressReady" @@ -39,14 +30,41 @@ const ( BrokerConditionAddressable apis.ConditionType = "Addressable" ) +var brokerCondSet = apis.NewLivingConditionSet( + BrokerConditionIngress, + BrokerConditionTriggerChannel, + BrokerConditionFilter, + BrokerConditionAddressable, +) +var brokerCondSetLock = sync.RWMutex{} + +// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class. +func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) { + brokerCondSetLock.Lock() + defer brokerCondSetLock.Unlock() + + brokerCondSet = conditionSet +} + // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. -func (*Broker) GetConditionSet() apis.ConditionSet { +func (b *Broker) GetConditionSet() apis.ConditionSet { + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + + return brokerCondSet +} + +// GetConditionSet retrieves the condition set for this resource. +func (bs *BrokerStatus) GetConditionSet() apis.ConditionSet { + brokerCondSetLock.RLock() + defer brokerCondSetLock.RUnlock() + return brokerCondSet } // GetTopLevelCondition returns the top level Condition. func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { - return brokerCondSet.Manage(bs).GetTopLevelCondition() + return bs.GetConditionSet().Manage(bs).GetTopLevelCondition() } // SetAddress makes this Broker addressable by setting the URI. It also @@ -54,61 +72,23 @@ func (bs *BrokerStatus) GetTopLevelCondition() *apis.Condition { func (bs *BrokerStatus) SetAddress(url *apis.URL) { bs.Address.URL = url if url != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionAddressable) + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable) } else { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil") } } // GetCondition returns the condition currently associated with the given type, or nil. func (bs *BrokerStatus) GetCondition(t apis.ConditionType) *apis.Condition { - return brokerCondSet.Manage(bs).GetCondition(t) + return bs.GetConditionSet().Manage(bs).GetCondition(t) } // IsReady returns true if the resource is ready overall. func (bs *BrokerStatus) IsReady() bool { - return brokerCondSet.Manage(bs).IsHappy() + return bs.GetConditionSet().Manage(bs).IsHappy() } // InitializeConditions sets relevant unset conditions to Unknown state. func (bs *BrokerStatus) InitializeConditions() { - brokerCondSet.Manage(bs).InitializeConditions() -} - -func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress) - } else { - bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } -} - -func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) { - // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... - address := cs.AddressStatus.Address - if address != nil { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) - } else { - bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") - } -} - -func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) -} - -func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter) - } else { - bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) - } + bs.GetConditionSet().Manage(bs).InitializeConditions() } diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go b/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go new file mode 100644 index 00000000000..2a39ed71a18 --- /dev/null +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle_mt.go @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1beta1 + +import ( + corev1 "k8s.io/api/core/v1" + + "knative.dev/eventing/pkg/apis/duck" + duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" +) + +func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) + } else { + bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} + +func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) { + // TODO: Once you can get a Ready status from Channelable in a generic way, use it here... + address := cs.AddressStatus.Address + if address != nil { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionTriggerChannel) + } else { + bs.MarkTriggerChannelFailed("ChannelNotReady", "trigger Channel is not ready: not addressable") + } +} + +func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) +} + +func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { + if duck.EndpointsAreAvailable(ep) { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) + } else { + bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + } +} diff --git a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go index eb1cfe81e3f..5f7fb702da1 100644 --- a/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1beta1/broker_lifecycle_test.go @@ -18,8 +18,10 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -57,6 +59,54 @@ var ( } ) +func TestBrokerGetConditionSet(t *testing.T) { + + customCondition := apis.NewLivingConditionSet( + apis.ConditionReady, + "ConditionGolangReady", + ) + brokerClass := "Golang" + + tt := []struct { + name string + broker Broker + expectedConditionSet apis.ConditionSet + }{ + { + name: "default condition set", + broker: Broker{}, + expectedConditionSet: brokerCondSet, + }, + { + name: "custom condition set", + broker: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + eventing.BrokerClassKey: brokerClass, + }, + }, + }, + expectedConditionSet: customCondition, + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + defer RegisterAlternateBrokerConditionSet(brokerCondSet) // reset to default condition set + + RegisterAlternateBrokerConditionSet(tc.expectedConditionSet) + + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + if diff := cmp.Diff(tc.expectedConditionSet, tc.broker.Status.GetConditionSet(), cmp.AllowUnexported(apis.ConditionSet{})); diff != "" { + t.Errorf("unexpected conditions (-want, +got) %s", diff) + } + }) + } +} + func TestBrokerGetCondition(t *testing.T) { tests := []struct { name string diff --git a/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go b/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go index 0b906823f94..0d5f6f95203 100644 --- a/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go +++ b/pkg/apis/eventing/v1beta1/eventtype_lifecycle.go @@ -85,7 +85,7 @@ func (et *EventTypeStatus) MarkBrokerNotConfigured() { } func (et *EventTypeStatus) PropagateBrokerStatus(bs *BrokerStatus) { - bc := brokerCondSet.Manage(bs).GetTopLevelCondition() + bc := bs.GetConditionSet().Manage(bs).GetTopLevelCondition() if bc == nil { et.MarkBrokerNotConfigured() return diff --git a/pkg/reconciler/mtbroker/controller.go b/pkg/reconciler/mtbroker/controller.go index 40486b7ba46..0f3366941ca 100644 --- a/pkg/reconciler/mtbroker/controller.go +++ b/pkg/reconciler/mtbroker/controller.go @@ -36,6 +36,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1beta1/broker" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" client "knative.dev/pkg/client/injection/kube/client" @@ -54,6 +55,14 @@ import ( // when creating events. const controllerAgentName = "mt-broker-controller" +const ( + BrokerConditionReady = apis.ConditionReady + BrokerConditionIngress apis.ConditionType = "IngressReady" + BrokerConditionTriggerChannel apis.ConditionType = "TriggerChannelReady" + BrokerConditionFilter apis.ConditionType = "FilterReady" + BrokerConditionAddressable apis.ConditionType = "Addressable" +) + // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( @@ -85,6 +94,13 @@ func NewController( }() } + v1beta1.RegisterAlternateBrokerConditionSet(apis.NewLivingConditionSet( + BrokerConditionIngress, + BrokerConditionTriggerChannel, + BrokerConditionFilter, + BrokerConditionAddressable, + )) + r := &Reconciler{ eventingClientSet: eventingclient.Get(ctx), dynamicClientSet: dynamicclient.Get(ctx),