Skip to content

Commit

Permalink
Revert "Add serviceaccount in parallel (#7373)"
Browse files Browse the repository at this point in the history
This reverts commit dc96522.
  • Loading branch information
creydr committed May 7, 2024
1 parent 49275e5 commit a32d1da
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 344 deletions.
21 changes: 2 additions & 19 deletions pkg/apis/flows/v1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -41,8 +41,7 @@ const (

// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
ParallelConditionAddressable apis.ConditionType = "Addressable"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -196,22 +195,6 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string,
pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
ps.Address = address
if address == nil {
Expand Down
173 changes: 66 additions & 107 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -118,9 +115,6 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -148,9 +142,6 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -336,121 +327,89 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
markOIDCServiceAccountCreated bool
want bool
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
want bool
}{{
name: "ingress false, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable not ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
want: false,
}, {
name: "ingress true, one channelable ready, one subscription not ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
want: false,
}, {
name: "ingress false, one channelable ready, one subscription ready,serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: true,
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: false,
want: false,
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
}, {
name: "ingress true, one channelable ready, one not, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
}, {
name: "ingress true, two channelables ready, one subscription ready, one not, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
}, {
name: "ingress false, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: false,
want: false,
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: true,
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := ParallelStatus{}
ps.PropagateChannelStatuses(test.ichannel, test.channels)
ps.PropagateSubscriptionStatuses(test.fsubs, test.subs)
if test.markOIDCServiceAccountCreated {
ps.MarkOIDCIdentityCreatedSucceeded()
} else {
ps.MarkOIDCIdentityCreatedFailed("Unable to create serviceaccount", "")
}
got := ps.IsReady()
want := test.want
if want != got {
Expand Down
40 changes: 5 additions & 35 deletions pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@ package parallel
import (
"context"

"knative.dev/eventing/pkg/auth"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand All @@ -48,33 +42,14 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
serviceAccountLister: oidcServiceaccountInformer.Lister(),
kubeclient: kubeclient.Get(ctx),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
}
impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(parallelInformer.Informer())
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
}
impl := parallelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -85,11 +60,6 @@ func NewController(
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
// Reconcile Parallel when the OIDC service account changes
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
24 changes: 2 additions & 22 deletions pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,23 @@ limitations under the License.
package parallel

import (
"context"
"testing"

"knative.dev/eventing/pkg/auth"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t, SetUpInformerSelector)
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))
c := NewController(ctx, configmap.NewStaticWatcher())

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector)
return ctx
}

0 comments on commit a32d1da

Please sign in to comment.