Skip to content

Commit

Permalink
Merge pull request #3808 from ctripcloud/refactor-execution-workstatus
Browse files Browse the repository at this point in the history
only update object in member cluster by execution controller
  • Loading branch information
karmada-bot committed Aug 21, 2023
2 parents 24bb582 + 89e3673 commit 90ccbcd
Show file tree
Hide file tree
Showing 15 changed files with 2,085 additions and 790 deletions.
58 changes: 31 additions & 27 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/restmapper"
Expand Down Expand Up @@ -217,7 +218,7 @@ func run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)

if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
if err = setupControllers(ctx, controllerManager, opts); err != nil {
return err
}

Expand All @@ -229,27 +230,30 @@ func run(ctx context.Context, opts *options.Options) error {
return nil
}

func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error {
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) error {
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done())
controlPlaneKubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)

// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
sharedFactory.Start(stopChan)
sharedFactory.WaitForCacheSync(stopChan)
sharedFactory.Start(ctx.Done())
sharedFactory.WaitForCacheSync(ctx.Done())

resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
if err := mgr.Add(resourceInterpreter); err != nil {
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSetForAgent)

controllerContext := controllerscontext.Context{
Mgr: mgr,
Ctx: ctx,
ObjectWatcher: objectWatcher,
Opts: controllerscontext.Options{
Controllers: opts.Controllers,
Expand All @@ -269,8 +273,9 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
CertRotationRemainingTimeThreshold: opts.CertRotationRemainingTimeThreshold,
KarmadaKubeconfigNamespace: opts.KarmadaKubeconfigNamespace,
},
StopChan: stopChan,
ResourceInterpreter: resourceInterpreter,
StopChan: ctx.Done(),
ResourceInterpreter: resourceInterpreter,
MemberClusterInformer: memberClusterInformer,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand All @@ -279,7 +284,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop

// Ensure the InformerManager stops when the stop channel closes
go func() {
<-stopChan
<-ctx.Done()
genericmanager.StopInstance()
}()

Expand Down Expand Up @@ -315,14 +320,17 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)

func startExecutionController(ctx controllerscontext.Context) (bool, error) {
executionController := &execution.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
InformerManager: genericmanager.GetInstance(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
Ctx: ctx.Ctx,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs,
StopChan: ctx.StopChan,
MemberClusterInformer: ctx.MemberClusterInformer,
}
executionController.RunWorkQueue()
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
Expand All @@ -331,18 +339,14 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) {

func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
StopChan: ctx.StopChan,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
MemberClusterInformer: ctx.MemberClusterInformer,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
Expand Down
59 changes: 31 additions & 28 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/restmapper"
Expand Down Expand Up @@ -170,7 +171,7 @@ func Run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)

setupControllers(controllerManager, opts, ctx.Done())
setupControllers(ctx, controllerManager, opts)

// blocks until the context is done.
if err := controllerManager.Start(ctx); err != nil {
Expand Down Expand Up @@ -369,14 +370,17 @@ func startBindingStatusController(ctx controllerscontext.Context) (enabled bool,

func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) {
executionController := &execution.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
InformerManager: genericmanager.GetInstance(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
}
Ctx: ctx.Ctx,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs,
StopChan: ctx.StopChan,
MemberClusterInformer: ctx.MemberClusterInformer,
}
executionController.RunWorkQueue()
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
Expand All @@ -386,18 +390,14 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
opts := ctx.Opts
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
StopChan: ctx.StopChan,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
MemberClusterInformer: ctx.MemberClusterInformer,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
Expand Down Expand Up @@ -592,7 +592,7 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
}

// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) {
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
Expand All @@ -605,20 +605,21 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
return
}

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done())
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
sharedFactory.Start(stopChan)
sharedFactory.WaitForCacheSync(stopChan)
sharedFactory.Start(ctx.Done())
sharedFactory.WaitForCacheSync(ctx.Done())

resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
if err := mgr.Add(resourceInterpreter); err != nil {
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSet)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Expand Down Expand Up @@ -653,9 +654,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
}
}
setupClusterAPIClusterDetector(mgr, opts, stopChan)
setupClusterAPIClusterDetector(mgr, opts, ctx.Done())
controllerContext := controllerscontext.Context{
Mgr: mgr,
Ctx: ctx,
ObjectWatcher: objectWatcher,
Opts: controllerscontext.Options{
Controllers: opts.Controllers,
Expand All @@ -679,12 +681,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
HPAControllerConfiguration: opts.HPAControllerConfiguration,
},
StopChan: stopChan,
StopChan: ctx.Done(),
DynamicClientSet: dynamicClientSet,
KubeClientSet: kubeClientSet,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
MemberClusterInformer: memberClusterInformer,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand All @@ -693,7 +696,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop

// Ensure the InformerManager stops when the stop channel closes
go func() {
<-stopChan
<-ctx.Done()
genericmanager.StopInstance()
}()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package context

import (
"context"
"regexp"
"time"

Expand All @@ -15,6 +16,7 @@ import (
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)
Expand Down Expand Up @@ -88,6 +90,7 @@ type Options struct {
// Context defines the context object for controller.
type Context struct {
Mgr controllerruntime.Manager
Ctx context.Context
ObjectWatcher objectwatcher.ObjectWatcher
Opts Options
StopChan <-chan struct{}
Expand All @@ -96,6 +99,7 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
MemberClusterInformer memberclusterinformer.MemberClusterInformer
}

// IsControllerEnabled check if a specified controller enabled or not.
Expand Down

0 comments on commit 90ccbcd

Please sign in to comment.