Skip to content

Commit

Permalink
Change codes according to round 1 suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
cwdsuzhou committed Sep 18, 2020
1 parent bf7529d commit 2c00efe
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 48 deletions.
2 changes: 2 additions & 0 deletions cmd/controller/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ServerRunOptions struct {
InCluster bool
ApiServerQPS int
ApiServerBurst int
Workers int
EnableLeaderElection bool
}

Expand All @@ -41,5 +42,6 @@ func (s *ServerRunOptions) addAllFlags() {
pflag.StringVar(&s.MasterUrl, "masterUrl", s.MasterUrl, "Master Url if not run in cluster.")
pflag.IntVar(&s.ApiServerQPS, "qps", 5, "qps of query apiserver.")
pflag.IntVar(&s.ApiServerBurst, "burst", 10, "burst of query apiserver.")
pflag.IntVar(&s.Workers, "workers", 1, "workers of scheduler-plugin-controllers.")
pflag.BoolVar(&s.EnableLeaderElection, "enableLeaderElection", s.EnableLeaderElection, "If EnableLeaderElection for controller.")
}
14 changes: 6 additions & 8 deletions cmd/controller/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package app

import (
"context"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/server"
Expand All @@ -32,6 +29,7 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"os"

"sigs.k8s.io/scheduler-plugins/pkg/controller"
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -68,18 +66,18 @@ func Run(s *ServerRunOptions) error {
pgClient := pgclientset.NewForConfigOrDie(config)
kubeClient := kubernetes.NewForConfigOrDie(config)

scheduleInformer := pgformers.NewSharedInformerFactory(pgClient, 30*time.Minute)
pgInformer := scheduleInformer.Scheduling().V1alpha1().PodGroups()
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0)
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()

informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = util.PodGroupLabel
}))
podInformer := informerFactory.Core().V1().Pods()
ctrl := controller.NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient)
pgInformerFactory.Start(stopCh)
informerFactory.Start(stopCh)
scheduleInformer.Start(stopCh)
run := func(ctx context.Context) {
ctrl.Run(10, ctx.Done())
ctrl.Run(s.Workers, ctx.Done())
}

if !s.EnableLeaderElection {
Expand Down Expand Up @@ -114,7 +112,7 @@ func Run(s *ServerRunOptions) error {
klog.Fatalf("leaderelection lost")
},
},
Name: "podgroup controller",
Name: "scheduler-plugins controller",
})
}

Expand Down
40 changes: 24 additions & 16 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,20 @@ func NewPodGroupController(client kubernetes.Interface,
pgClient pgclientset.Interface) *PodGroupController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "Coscheduling"})

ctrl := &PodGroupController{
eventRecorder: eventRecorder,
eventRecorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "Coscheduling"}),
pgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Coscheduling-queue"),
}

pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.pgAdded,
UpdateFunc: ctrl.pgUpdated,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.podAdded,
UpdateFunc: ctrl.podUpdated,
})

ctrl.pgLister = pgInformer.Lister()
ctrl.podLister = podInformer.Lister()
Expand Down Expand Up @@ -114,12 +116,12 @@ func (ctrl *PodGroupController) pgAdded(obj interface{}) {
if pg.Status.Phase == pgv1.PodGroupFinished || pg.Status.Phase == pgv1.PodGroupFailed {
return
}
// If startScheduleTime - createTime > 2days, do not enqueue again because pod may have be GC
// If startScheduleTime - createTime > 2days, do not enqueue again because pod may have been GCed
if pg.Status.Scheduled == pg.Spec.MinMember && pg.Status.Running == 0 &&
pg.Status.ScheduleStartTime.Sub(pg.CreationTimestamp.Time) > 48*time.Hour {
return
}
klog.Info("enqueue ", "key ", key)
klog.Infof("Enqueue key %v", key)
ctrl.pgQueue.Add(key)
}

Expand All @@ -128,15 +130,25 @@ func (ctrl *PodGroupController) pgUpdated(old, new interface{}) {
ctrl.pgAdded(new)
}

// pgDelete reacts to a PG update
func (ctrl *PodGroupController) pgDelete(new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
// podAdded reacts to a PG creation
func (ctrl *PodGroupController) podAdded(obj interface{}) {
pod := obj.(*v1.Pod)
pgName, ok := util.VerifyPodLabelSatisfied(pod)
if !ok || len(pgName) == 0 {
return
}
pg, err := ctrl.pgLister.PodGroups(pod.Namespace).Get(pgName)
if err != nil {
runtime.HandleError(err)
klog.Error(err)
return
}
klog.Info("enqueue ", "key ", key)
klog.V(3).Infof("pg %q delete change", key)
klog.V(5).Infof("Add pg %v when pod %v add", pg.Name, pod.Name)
ctrl.pgAdded(pg)
}

// pgUpdated reacts to a PG update
func (ctrl *PodGroupController) podUpdated(old, new interface{}) {
ctrl.podAdded(new)
}

// syncPG deals with one key off the queue. It returns false when it's time to quit.
Expand Down Expand Up @@ -172,7 +184,6 @@ func (ctrl *PodGroupController) sync() {

// syncHandle syncs pod group and convert status
func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGroup) {

key, err := cache.MetaNamespaceKeyFunc(pg)
if err != nil {
runtime.HandleError(err)
Expand All @@ -194,6 +205,7 @@ func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGro
selector := labels.Set(map[string]string{util.PodGroupLabel: pgCopy.Name}).AsSelector()
pods, err := ctrl.podLister.List(selector)
if err != nil {
klog.Errorf("List pods for group %v failed: %v", pgCopy.Name, err)
return
}
var (
Expand Down Expand Up @@ -237,10 +249,6 @@ func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *pgv1.PodGro
if err == nil {
ctrl.pgQueue.Forget(pg)
}
if pgCopy.Status.Phase == pgv1.PodGroupFinished || pgCopy.Status.Phase == pgv1.PodGroupFailed {
return
}
ctrl.pgQueue.AddRateLimited(key)
}

func (ctrl *PodGroupController) patchPodGroup(old, new *pgv1.PodGroup) error {
Expand Down
47 changes: 26 additions & 21 deletions pkg/controller/podgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controller

import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"testing"
"time"

Expand All @@ -20,15 +22,13 @@ import (

func Test_Run(t *testing.T) {
ctx := context.TODO()
podSuccess := v1.PodSucceeded
podFailed := v1.PodFailed
createTime := metav1.Time{Time: time.Now().Add(-72 * time.Hour)}
cases := []struct {
name string
pgName string
minMember uint32
podNames []string
podNextPhase *v1.PodPhase
podNextPhase v1.PodPhase
podPhase v1.PodPhase
previousPhase v1alpha1.PodGroupPhase
desiredGroupPhase v1alpha1.PodGroupPhase
Expand Down Expand Up @@ -87,7 +87,7 @@ func Test_Run(t *testing.T) {
podPhase: v1.PodPending,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupFinished,
podNextPhase: &podSuccess,
podNextPhase: v1.PodSucceeded,
},
{
name: "Group status convert from scheduling to succeed",
Expand All @@ -97,7 +97,7 @@ func Test_Run(t *testing.T) {
podPhase: v1.PodPending,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupFailed,
podNextPhase: &podFailed,
podNextPhase: v1.PodSucceeded,
},
{
name: "Group group should not enqueue, created too long",
Expand All @@ -112,41 +112,46 @@ func Test_Run(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ps := pods(c.podNames, c.pgName, c.podPhase)
ps := makePods(c.podNames, c.pgName, c.podPhase)
kubeClient := fake.NewSimpleClientset(ps[0], ps[1])
pg := pg(c.pgName, 2, c.previousPhase, c.podGroupCreateTime)
pg := makePG(c.pgName, 2, c.previousPhase, c.podGroupCreateTime)
pgClient := pgfake.NewSimpleClientset(pg)

informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
schedulingInformerFactory := pgformers.NewSharedInformerFactory(pgClient, controller.NoResyncPeriodFunc())
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
pgInformer := schedulingInformerFactory.Scheduling().V1alpha1().PodGroups()
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
ctrl := NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient)

pgInformerFactory.Start(ctx.Done())
informerFactory.Start(ctx.Done())
schedulingInformerFactory.Start(ctx.Done())
if c.podNextPhase != nil {
ps := pods(c.podNames, c.pgName, *c.podNextPhase)
// 0 means not set
if len(c.podNextPhase) != 0 {
ps := makePods(c.podNames, c.pgName, c.podNextPhase)
for _, p := range ps {
kubeClient.CoreV1().Pods(p.Namespace).UpdateStatus(ctx, p, metav1.UpdateOptions{})
}
}
go ctrl.Run(1, ctx.Done())
time.Sleep(200 * time.Millisecond)

pg, err := pgClient.SchedulingV1alpha1().PodGroups("default").Get(ctx, c.pgName, metav1.GetOptions{})
err := wait.Poll(200*time.Millisecond, 1*time.Second, func() (done bool, err error) {
pg, err := pgClient.SchedulingV1alpha1().PodGroups("default").Get(ctx, c.pgName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pg.Status.Phase != c.desiredGroupPhase {
return false, fmt.Errorf("want %v, got %v", c.desiredGroupPhase, pg.Status.Phase)
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
if pg.Status.Phase != c.desiredGroupPhase {
t.Fatalf("descired %v, get %v", c.desiredGroupPhase, pg.Status.Phase)

}
})
}

}

func pods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod {
func makePods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod {
pds := make([]*v1.Pod, 0)
for _, name := range podNames {
pod := st.MakePod().Namespace("default").Name(name).Obj()
Expand All @@ -157,7 +162,7 @@ func pods(podNames []string, pgName string, phase v1.PodPhase) []*v1.Pod {
return pds
}

func pg(pgName string, minMember uint32, previousPhase v1alpha1.PodGroupPhase, createTime *metav1.Time) *v1alpha1.PodGroup {
func makePG(pgName string, minMember uint32, previousPhase v1alpha1.PodGroupPhase, createTime *metav1.Time) *v1alpha1.PodGroup {
pg := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: pgName,
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/k8s.go → pkg/util/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
jsonpatch "github.com/evanphx/json-patch"
v1 "k8s.io/api/core/v1"

v12 "sigs.k8s.io/scheduler-plugins/pkg/apis/podgroup/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/apis/podgroup/v1alpha1"
)

// DefaultWaitTime is 60s if MaxScheduleTime is not specified.
Expand Down Expand Up @@ -59,7 +59,7 @@ func VerifyPodLabelSatisfied(pod *v1.Pod) (string, bool) {
}

// GetPodGroupFullName verify if pod ann satisfies batch scheduling
func GetPodGroupFullName(pg *v12.PodGroup) string {
func GetPodGroupFullName(pg *v1alpha1.PodGroup) string {
if pg == nil {
return ""
}
Expand All @@ -68,7 +68,7 @@ func GetPodGroupFullName(pg *v12.PodGroup) string {
}

// GetWaitTimeDuration verify if pod ann satisfies batch scheduling
func GetWaitTimeDuration(pg *v12.PodGroup, defaultMaxScheTime *time.Duration) time.Duration {
func GetWaitTimeDuration(pg *v1alpha1.PodGroup, defaultMaxScheTime *time.Duration) time.Duration {
waitTime := DefaultWaitTime
if defaultMaxScheTime != nil || *defaultMaxScheTime != 0 {
waitTime = *defaultMaxScheTime
Expand Down
File renamed without changes.

0 comments on commit 2c00efe

Please sign in to comment.