Skip to content

Commit

Permalink
reconcile triggers using v1beta1
Browse files Browse the repository at this point in the history
  • Loading branch information
vaikas committed May 6, 2020
1 parent 1175eec commit fe79477
Show file tree
Hide file tree
Showing 24 changed files with 1,026 additions and 754 deletions.
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -1016,6 +1016,8 @@ k8s.io/utils v0.0.0-20191010214722-8d271d903fe4/go.mod h1:sZAwmy6armz5eXlNoLmJcl
knative.dev/pkg v0.0.0-20191101194912-56c2594e4f11/go.mod h1:pgODObA1dTyhNoFxPZTTjNWfx6F0aKsKzn+vaT9XO/Q=
knative.dev/pkg v0.0.0-20200501005942-d980c0865972 h1:N/umsmNgROaU+fIziEBZ+L32OMpgwZRYW3VeHUPR8ZA=
knative.dev/pkg v0.0.0-20200501005942-d980c0865972/go.mod h1:X4wmXb4xUR+1eDBoP6AeVfAqsyxl1yATnRdSgFdjhQw=
knative.dev/pkg v0.0.0-20200505191044-3da93ebb24c2 h1:Qu2NlOHb9p3g+CSL/ok9+FySowN60URFEKRSXfWtDv4=
knative.dev/pkg v0.0.0-20200505212244-1099bd14baae h1:kqhjQYETRl/g8OtxTShmowmzUaWTsofXa7gRqAaip6Q=
knative.dev/test-infra v0.0.0-20200429211942-f4c4853375cf h1:rNWg3NiXNLjZC9C1EJf2qKA+mRnrWMLW1KONsEusLYg=
knative.dev/test-infra v0.0.0-20200429211942-f4c4853375cf/go.mod h1:xcdUkMJrLlBswIZqL5zCuBFOC22WIPMQoVX1L35i0vQ=
knative.dev/test-infra v0.0.0-20200430225942-f7c1fafc1cde h1:QSzxFsf21WXNhODvh0jRKbFR+c5UI7WFjiISy/sMOLg=
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle.go
Expand Up @@ -21,7 +21,7 @@ import (

corev1 "k8s.io/api/core/v1"
"knative.dev/eventing/pkg/apis/duck"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)

var brokerCondSet = apis.NewLivingConditionSet(
Expand Down Expand Up @@ -75,7 +75,7 @@ func (bs *BrokerStatus) MarkTriggerChannelFailed(reason, format string, args ...
brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, reason, format, args...)
}

func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1beta1.ChannelableStatus) {
func (bs *BrokerStatus) PropagateTriggerChannelReadiness(cs *duckv1alpha1.ChannelableStatus) {
// TODO: Once you can get a Ready status from Channelable in a generic way, use it here...
address := cs.AddressStatus.Address
if address != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/eventing/v1beta1/test_helper.go
Expand Up @@ -59,6 +59,14 @@ func (t testHelper) ReadyBrokerStatus() *BrokerStatus {
return bs
}

func (t testHelper) ReadyBrokerCondition() *apis.Condition {
return &apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
Severity: apis.ConditionSeverityError,
}
}

func (t testHelper) UnknownBrokerStatus() *BrokerStatus {
bs := &BrokerStatus{}
return bs
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/eventing/v1beta1/trigger_lifecycle.go
Expand Up @@ -71,8 +71,7 @@ func (ts *TriggerStatus) InitializeConditions() {
triggerCondSet.Manage(ts).InitializeConditions()
}

func (ts *TriggerStatus) PropagateBrokerStatus(bs *BrokerStatus) {
bc := bs.GetTopLevelCondition()
func (ts *TriggerStatus) PropagateBrokerCondition(bc *apis.Condition) {
if bc == nil {
ts.MarkBrokerNotConfigured()
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventing/v1beta1/trigger_lifecycle_test.go
Expand Up @@ -322,7 +322,7 @@ func TestTriggerConditionStatus(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ts := &TriggerStatus{}
if test.brokerStatus != nil {
ts.PropagateBrokerStatus(test.brokerStatus)
ts.PropagateBrokerCondition(test.brokerStatus.GetTopLevelCondition())
}
if test.subscriptionCondition != nil {
ts.PropagateSubscriptionCondition(test.subscriptionCondition)
Expand Down
35 changes: 18 additions & 17 deletions pkg/reconciler/broker/broker.go
Expand Up @@ -40,13 +40,19 @@ import (
"knative.dev/pkg/logging"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/broker"
pkgduckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
pkgduckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"

// Need this for Brokers since we reconcile them still as v1alpha1
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1alpha1"
eventingv1beta1listers "knative.dev/eventing/pkg/client/listers/eventing/v1beta1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/broker/resources"
"knative.dev/eventing/pkg/reconciler/names"
Expand All @@ -71,7 +77,7 @@ type Reconciler struct {
endpointsLister corev1listers.EndpointsLister
deploymentLister appsv1listers.DeploymentLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister
triggerLister eventingv1beta1listers.TriggerLister

channelableTracker duck.ListableTracker

Expand Down Expand Up @@ -159,21 +165,16 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (kme
return nil, fmt.Errorf("Failed to reconcile trigger channel: %v", err)
}

if triggerChan.Status.Address == nil {
logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return nil, nil
}
if url := triggerChan.Status.Address.GetURL(); url.Host == "" {
// We check the trigger Channel's address here because it is needed to create the Ingress Deployment.
if triggerChan.Status.Address == nil || triggerChan.Status.Address.URL == nil {
logging.FromContext(ctx).Debug("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return nil, nil
}
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)
// We need to downconvert from v1beta1 -> v1alpha1 since broker only handles v1alpha1.
channelStatus := &duckv1alpha1.ChannelableStatus{AddressStatus: pkgduckv1alpha1.AddressStatus{Address: &pkgduckv1alpha1.Addressable{Addressable: pkgduckv1beta1.Addressable{URL: triggerChan.Status.Address.URL}}}}
b.Status.PropagateTriggerChannelReadiness(channelStatus)

if err := r.reconcileFilterDeployment(ctx, b); err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
Expand Down Expand Up @@ -305,7 +306,7 @@ func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Bro
}

// reconcileChannel reconciles Broker's 'b' underlying channel.
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *v1alpha1.Broker) (*duckv1alpha1.Channelable, error) {
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *v1alpha1.Broker) (*duckv1beta1.Channelable, error) {
lister, err := r.channelableTracker.ListerFor(channelObjRef)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Error getting lister for Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
Expand All @@ -322,7 +323,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Info(fmt.Sprintf("Created Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("NewChannel", newChannel))
channelable := &duckv1alpha1.Channelable{}
channelable := &duckv1beta1.Channelable{}
err = duckapis.FromUnstructured(created, channelable)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("createdChannel", created), zap.Error(err))
Expand All @@ -335,7 +336,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Debug(fmt.Sprintf("Found Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name))
channelable, ok := c.(*duckv1alpha1.Channelable)
channelable, ok := c.(*duckv1beta1.Channelable)
if !ok {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -403,12 +404,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service)
}

// reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel.
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) error {
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1beta1.Channelable) error {
expected := resources.MakeIngressDeployment(&resources.IngressArgs{
Broker: b,
Image: r.ingressImage,
ServiceAccountName: r.ingressServiceAccountName,
ChannelAddress: c.Status.Address.GetURL().Host,
ChannelAddress: c.Status.Address.URL.Host,
})
return r.reconcileDeployment(ctx, expected)
}
Expand Down Expand Up @@ -472,7 +473,7 @@ func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namesp
if bs == nil {
trigger.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", name)
} else {
trigger.Status.PropagateBrokerStatus(bs)
trigger.Status.PropagateBrokerCondition(bs.GetTopLevelCondition())
}
if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil {
logging.FromContext(ctx).Error("Failed to update Trigger status", zap.Error(updateStatusErr))
Expand Down

0 comments on commit fe79477

Please sign in to comment.