diff --git a/cmd/controller/app/options.go b/cmd/controller/app/options.go index 0c7cf43657..0fc1abb7f9 100644 --- a/cmd/controller/app/options.go +++ b/cmd/controller/app/options.go @@ -26,6 +26,7 @@ type ServerRunOptions struct { InCluster bool ApiServerQPS int ApiServerBurst int + Workers int EnableLeaderElection bool } @@ -41,5 +42,6 @@ func (s *ServerRunOptions) addAllFlags() { pflag.StringVar(&s.MasterUrl, "masterUrl", s.MasterUrl, "Master Url if not run in cluster.") pflag.IntVar(&s.ApiServerQPS, "qps", 5, "qps of query apiserver.") pflag.IntVar(&s.ApiServerBurst, "burst", 10, "burst of query apiserver.") + pflag.IntVar(&s.Workers, "workers", 1, "workers of scheduler-plugin-controllers.") pflag.BoolVar(&s.EnableLeaderElection, "enableLeaderElection", s.EnableLeaderElection, "If EnableLeaderElection for controller.") } diff --git a/cmd/controller/app/server.go b/cmd/controller/app/server.go index 4389763db6..136dbbfe42 100644 --- a/cmd/controller/app/server.go +++ b/cmd/controller/app/server.go @@ -18,9 +18,6 @@ package app import ( "context" - "os" - "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/server" @@ -32,6 +29,7 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" + "os" "sigs.k8s.io/scheduler-plugins/pkg/controller" pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" @@ -68,18 +66,18 @@ func Run(s *ServerRunOptions) error { pgClient := pgclientset.NewForConfigOrDie(config) kubeClient := kubernetes.NewForConfigOrDie(config) - scheduleInformer := pgformers.NewSharedInformerFactory(pgClient, 30*time.Minute) - pgInformer := scheduleInformer.Scheduling().V1alpha1().PodGroups() + pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0) + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) { opt.LabelSelector = util.PodGroupLabel })) podInformer := informerFactory.Core().V1().Pods() ctrl := controller.NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient) + pgInformerFactory.Start(stopCh) informerFactory.Start(stopCh) - scheduleInformer.Start(stopCh) run := func(ctx context.Context) { - ctrl.Run(10, ctx.Done()) + ctrl.Run(s.Workers, ctx.Done()) } if !s.EnableLeaderElection { @@ -114,7 +112,7 @@ func Run(s *ServerRunOptions) error { klog.Fatalf("leaderelection lost") }, }, - Name: "podgroup controller", + Name: "scheduler-plugins controller", }) } diff --git a/pkg/controller/podgroup.go b/pkg/controller/podgroup.go index 0770678f93..93236d800d 100644 --- a/pkg/controller/podgroup.go +++ b/pkg/controller/podgroup.go @@ -63,11 +63,9 @@ func NewPodGroupController(client kubernetes.Interface, pgClient pgclientset.Interface) *PodGroupController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)}) - var eventRecorder record.EventRecorder - eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "Coscheduling"}) ctrl := &PodGroupController{ - eventRecorder: eventRecorder, + eventRecorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "Coscheduling"}), pgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Coscheduling-queue"), } @@ -75,6 +73,10 @@ func NewPodGroupController(client kubernetes.Interface, AddFunc: ctrl.pgAdded, UpdateFunc: ctrl.pgUpdated, }) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.podAdded, + UpdateFunc: ctrl.podUpdated, + }) ctrl.pgLister = pgInformer.Lister() ctrl.podLister = podInformer.Lister() @@ -114,12 +116,12 @@ func (ctrl *PodGroupController) pgAdded(obj interface{}) { if pg.Status.Phase == pgv1.PodGroupFinished || pg.Status.Phase == pgv1.PodGroupFailed { return } - // If startScheduleTime - createTime > 2days, do not enqueue again because pod may have be GC + // If startScheduleTime - createTime > 2days, do not enqueue again because pod may have been GCed if pg.Status.Scheduled == pg.Spec.MinMember && pg.Status.Running == 0 && pg.Status.ScheduleStartTime.Sub(pg.CreationTimestamp.Time) > 48*time.Hour { return } - klog.Info("enqueue ", "key ", key) + klog.Infof("Enqueue key %v", key) ctrl.pgQueue.Add(key) } @@ -128,15 +130,25 @@ func (ctrl *PodGroupController) pgUpdated(old, new interface{}) { ctrl.pgAdded(new) } -// pgDelete reacts to a PG update -func (ctrl *PodGroupController) pgDelete(new interface{}) { - key, err := cache.MetaNamespaceKeyFunc(new) +// podAdded reacts to a PG creation +func (ctrl *PodGroupController) podAdded(obj interface{}) { + pod := obj.(*v1.Pod) + pgName, ok := util.VerifyPodLabelSatisfied(pod) + if !ok || len(pgName) == 0 { + return + } + pg, err := ctrl.pgLister.PodGroups(pod.Namespace).Get(pgName) if err != nil { - runtime.HandleError(err) + klog.Error(err) return } - klog.Info("enqueue ", "key ", key) - klog.V(3).Infof("pg %q delete change", key) + klog.V(5).Infof("Add pg %v when pod %v add", pg.Name, pod.Name) + ctrl.pgAdded(pg) +} + +// pgUpdated reacts to a PG update +func (ctrl *PodGroupController) podUpdated(old, new interface{}) { + ctrl.podAdded(new) } // syncPG deals with one key off the queue. It returns false when it's time to quit. @@ -172,7 +184,6 @@ func (ctrl *PodGroupController) sync() { // syncHandle syncs pod group and convert status func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGroup) { - key, err := cache.MetaNamespaceKeyFunc(pg) if err != nil { runtime.HandleError(err) @@ -194,6 +205,7 @@ func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGro selector := labels.Set(map[string]string{util.PodGroupLabel: pgCopy.Name}).AsSelector() pods, err := ctrl.podLister.List(selector) if err != nil { + klog.Errorf("List pods for group %v failed: %v", pgCopy.Name, err) return } var ( @@ -237,10 +249,6 @@ func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGro if err == nil { ctrl.pgQueue.Forget(pg) } - if pgCopy.Status.Phase == pgv1.PodGroupFinished || pgCopy.Status.Phase == pgv1.PodGroupFailed { - return - } - ctrl.pgQueue.AddRateLimited(key) } func (ctrl *PodGroupController) patchPodGroup(old, new *pgv1.PodGroup) error { diff --git a/pkg/controller/podgroup_test.go b/pkg/controller/podgroup_test.go index 30b2fde6ab..2b7e61fd74 100644 --- a/pkg/controller/podgroup_test.go +++ b/pkg/controller/podgroup_test.go @@ -2,6 +2,8 @@ package controller import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/util/wait" "testing" "time" @@ -20,15 +22,13 @@ import ( func Test_Run(t *testing.T) { ctx := context.TODO() - podSuccess := v1.PodSucceeded - podFailed := v1.PodFailed createTime := metav1.Time{Time: time.Now().Add(-72 * time.Hour)} cases := []struct { name string pgName string minMember uint32 podNames []string - podNextPhase *v1.PodPhase + podNextPhase v1.PodPhase podPhase v1.PodPhase previousPhase v1alpha1.PodGroupPhase desiredGroupPhase v1alpha1.PodGroupPhase @@ -87,7 +87,7 @@ func Test_Run(t *testing.T) { podPhase: v1.PodPending, previousPhase: v1alpha1.PodGroupScheduling, desiredGroupPhase: v1alpha1.PodGroupFinished, - podNextPhase: &podSuccess, + podNextPhase: v1.PodSucceeded, }, { name: "Group status convert from scheduling to succeed", @@ -97,7 +97,7 @@ func Test_Run(t *testing.T) { podPhase: v1.PodPending, previousPhase: v1alpha1.PodGroupScheduling, desiredGroupPhase: v1alpha1.PodGroupFailed, - podNextPhase: &podFailed, + podNextPhase: v1.PodSucceeded, }, { name: "Group group should not enqueue, created too long", @@ -112,41 +112,46 @@ func Test_Run(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ps := pods(c.podNames, c.pgName, c.podPhase) + ps := makePods(c.podNames, c.pgName, c.podPhase) kubeClient := fake.NewSimpleClientset(ps[0], ps[1]) - pg := pg(c.pgName, 2, c.previousPhase, c.podGroupCreateTime) + pg := makePG(c.pgName, 2, c.previousPhase, c.podGroupCreateTime) pgClient := pgfake.NewSimpleClientset(pg) informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - schedulingInformerFactory := pgformers.NewSharedInformerFactory(pgClient, controller.NoResyncPeriodFunc()) + pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() - pgInformer := schedulingInformerFactory.Scheduling().V1alpha1().PodGroups() + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() ctrl := NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient) + pgInformerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done()) - schedulingInformerFactory.Start(ctx.Done()) - if c.podNextPhase != nil { - ps := pods(c.podNames, c.pgName, *c.podNextPhase) + // 0 means not set + if len(c.podNextPhase) != 0 { + ps := makePods(c.podNames, c.pgName, c.podNextPhase) for _, p := range ps { kubeClient.CoreV1().Pods(p.Namespace).UpdateStatus(ctx, p, metav1.UpdateOptions{}) } } go ctrl.Run(1, ctx.Done()) - time.Sleep(200 * time.Millisecond) - - pg, err := pgClient.SchedulingV1alpha1().PodGroups("default").Get(ctx, c.pgName, metav1.GetOptions{}) + err := wait.Poll(200*time.Millisecond, 1*time.Second, func() (done bool, err error) { + pg, err := pgClient.SchedulingV1alpha1().PodGroups("default").Get(ctx, c.pgName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if pg.Status.Phase != c.desiredGroupPhase { + return false, fmt.Errorf("want %v, got %v", c.desiredGroupPhase, pg.Status.Phase) + } + return true, nil + }) if err != nil { - t.Fatal(err) - } - if pg.Status.Phase != c.desiredGroupPhase { - t.Fatalf("descired %v, get %v", c.desiredGroupPhase, pg.Status.Phase) + } }) } } -func pods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod { +func makePods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod { pds := make([]*v1.Pod, 0) for _, name := range podNames { pod := st.MakePod().Namespace("default").Name(name).Obj() @@ -157,7 +162,7 @@ func pods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod { return pds } -func pg(pgName string, minMember uint32, previousPhase v1alpha1.PodGroupPhase, createTime *metav1.Time) *v1alpha1.PodGroup { +func makePG(pgName string, minMember uint32, previousPhase v1alpha1.PodGroupPhase, createTime *metav1.Time) *v1alpha1.PodGroup { pg := &v1alpha1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ Name: pgName, diff --git a/pkg/util/k8s.go b/pkg/util/podgroup.go similarity index 90% rename from pkg/util/k8s.go rename to pkg/util/podgroup.go index 5c03e061f1..9cb7e54aca 100644 --- a/pkg/util/k8s.go +++ b/pkg/util/podgroup.go @@ -24,7 +24,7 @@ import ( jsonpatch "github.com/evanphx/json-patch" v1 "k8s.io/api/core/v1" - v12 "sigs.k8s.io/scheduler-plugins/pkg/apis/podgroup/v1alpha1" + "sigs.k8s.io/scheduler-plugins/pkg/apis/podgroup/v1alpha1" ) // DefaultWaitTime is 60s if MaxScheduleTime is not specified. @@ -59,7 +59,7 @@ func VerifyPodLabelSatisfied(pod *v1.Pod) (string, bool) { } // GetPodGroupFullName verify if pod ann satisfies batch scheduling -func GetPodGroupFullName(pg *v12.PodGroup) string { +func GetPodGroupFullName(pg *v1alpha1.PodGroup) string { if pg == nil { return "" } @@ -68,7 +68,7 @@ func GetPodGroupFullName(pg *v12.PodGroup) string { } // GetWaitTimeDuration verify if pod ann satisfies batch scheduling -func GetWaitTimeDuration(pg *v12.PodGroup, defaultMaxScheTime *time.Duration) time.Duration { +func GetWaitTimeDuration(pg *v1alpha1.PodGroup, defaultMaxScheTime *time.Duration) time.Duration { waitTime := DefaultWaitTime if defaultMaxScheTime != nil || *defaultMaxScheTime != 0 { waitTime = *defaultMaxScheTime diff --git a/pkg/util/k8s_test.go b/pkg/util/podgroup_test.go similarity index 100% rename from pkg/util/k8s_test.go rename to pkg/util/podgroup_test.go