Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Useless options in controller #569

Merged
merged 1 commit into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cmd/controller/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import (
)

type ServerRunOptions struct {
KubeConfig string
MasterUrl string
MetricsAddr string
ProbeAddr string
InCluster bool
ApiServerQPS int
ApiServerBurst int
Workers int
Expand All @@ -39,9 +36,6 @@ func NewServerRunOptions() *ServerRunOptions {
}

func (s *ServerRunOptions) addAllFlags() {
pflag.BoolVar(&s.InCluster, "incluster", s.InCluster, "If controller run incluster.")
pflag.StringVar(&s.KubeConfig, "kubeConfig", s.KubeConfig, "Kube Config path if not run in cluster.")
pflag.StringVar(&s.MasterUrl, "masterUrl", s.MasterUrl, "Master Url if not run in cluster.")
pflag.StringVar(&s.MetricsAddr, "metricsAddr", ":8080", "Metrics server bind listen address.")
pflag.StringVar(&s.ProbeAddr, "probeAddr", ":8081", "Probe endpoint bind address.")
pflag.IntVar(&s.ApiServerQPS, "qps", 5, "qps of query apiserver.")
Expand Down
105 changes: 10 additions & 95 deletions cmd/controller/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,16 @@ limitations under the License.
package app

import (
"context"
"os"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"

schedulingv1a1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/controllers"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
)

var (
Expand All @@ -55,38 +40,10 @@ func init() {
utilruntime.Must(schedulingv1a1.AddToScheme(scheme))
}

func newConfig(kubeconfig, master string, inCluster bool) (*restclient.Config, error) {
var (
config *rest.Config
err error
)
if inCluster {
config, err = rest.InClusterConfig()
} else {
config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
}
if err != nil {
return nil, err
}
return config, nil
}

func Run(s *ServerRunOptions) error {
ctx := context.Background()
config, err := newConfig(s.KubeConfig, s.MasterUrl, s.InCluster)
if err != nil {
klog.ErrorS(err, "Failed to parse config")
os.Exit(1)
}
config := ctrl.GetConfigOrDie()
config.QPS = float32(s.ApiServerQPS)
config.Burst = s.ApiServerBurst
stopCh := server.SetupSignalHandler()
schedClient := schedclientset.NewForConfigOrDie(config)
kubeClient := kubernetes.NewForConfigOrDie(config)

schedInformerFactory := schedformers.NewSharedInformerFactory(schedClient, 0)

coreInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

// Controller Runtime Controllers
ctrl.SetLogger(klogr.New())
Expand All @@ -105,16 +62,18 @@ func Run(s *ServerRunOptions) error {
}

if err = (&controllers.PodGroupReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Workers: s.Workers,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodGroup")
return err
}

if err = (&controllers.ElasticQuotaReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Workers: s.Workers,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ElasticQuota")
return err
Expand All @@ -129,53 +88,9 @@ func Run(s *ServerRunOptions) error {
return err
}

run := func(ctx context.Context) {
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "unable to start manager")
panic(err)
}

select {}
}
schedInformerFactory.Start(stopCh)
coreInformerFactory.Start(stopCh)
if !s.EnableLeaderElection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be totally disabling leader election?

Copy link
Contributor Author

@nayihz nayihz Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. No need to implement this manually, the ctrl runtime can handle this instead. We have enabled leader election when create ctrl.NewManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: s.MetricsAddr,
Port: 9443,
HealthProbeBindAddress: s.ProbeAddr,
LeaderElection: s.EnableLeaderElection,
LeaderElectionID: "sched-plugins-controllers",
LeaderElectionNamespace: "kube-system",

run(ctx)
} else {
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New("endpoints",
"kube-system",
"sched-plugins-controller",
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
if err != nil {
klog.ErrorS(err, "Resource lock creation failed")
os.Exit(1)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.ErrorS(err, "Leaderelection lost")
os.Exit(1)
},
},
Name: "scheduler-plugins controller",
})
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "unable to start manager")
return err
}

<-stopCh
return nil
}
5 changes: 4 additions & 1 deletion pkg/controllers/elasticquota_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand All @@ -41,7 +42,8 @@ type ElasticQuotaReconciler struct {
recorder record.EventRecorder

client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
Workers int
}

// +kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=elasticquota,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -173,5 +175,6 @@ func (r *ElasticQuotaReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Watches(&source.Kind{Type: &v1.Pod{}}, &handler.EnqueueRequestForObject{}).
For(&schedv1alpha1.ElasticQuota{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding this.

Complete(r)
}
5 changes: 4 additions & 1 deletion pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand All @@ -47,7 +48,8 @@ type PodGroupReconciler struct {
recorder record.EventRecorder

client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
Workers int
}

// +kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=podgroups,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -195,6 +197,7 @@ func (r *PodGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&source.Kind{Type: &v1.Pod{}},
handler.EnqueueRequestsFromMapFunc(r.podToPodGroup)).
For(&schedv1alpha1.PodGroup{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

Expand Down