Skip to content

Commit

Permalink
koord-scheduler: coscheduling supports reservation (#1335)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed May 30, 2023
1 parent 89be790 commit 9a87d5a
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
pgfake "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake"
schedinformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"

koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
koordinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/core"
)
Expand Down Expand Up @@ -277,7 +279,11 @@ func setUp(ctx context.Context, podNames []string, pgName string, podPhase v1.Po
podInformer := informerFactory.Core().V1().Pods()
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()

pgMgr := core.NewPodGroupManager(pgClient, pgInformerFactory, informerFactory, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: time.Second}})
koordClient := koordfake.NewSimpleClientset()
koordInformerFactory := koordinformers.NewSharedInformerFactory(koordClient, 0)

args := &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: time.Second}}
pgMgr := core.NewPodGroupManager(args, pgClient, pgInformerFactory, informerFactory, koordInformerFactory)
ctrl := NewPodGroupController(pgInformer, podInformer, pgClient, pgMgr, 1)
return ctrl, kubeClient, pgClient
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"

"github.com/koordinator-sh/koordinator/apis/extension"
koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation"
)

type Status string
Expand Down Expand Up @@ -90,10 +92,11 @@ type PodGroupManager struct {

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(
args *config.CoschedulingArgs,
pgClient pgclientset.Interface,
pgSharedInformerFactory pgformers.SharedInformerFactory,
sharedInformerFactory informers.SharedInformerFactory,
args *config.CoschedulingArgs,
koordSharedInformerFactory koordinatorinformers.SharedInformerFactory,
) *PodGroupManager {
pgInformer := pgSharedInformerFactory.Scheduling().V1alpha1().PodGroups()
podInformer := sharedInformerFactory.Core().V1().Pods()
Expand All @@ -117,6 +120,9 @@ func NewPodGroupManager(
DeleteFunc: gangCache.onPodDelete,
}
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), sharedInformerFactory, podInformer.Informer(), podEventHandler)
reservationInformer := koordSharedInformerFactory.Scheduling().V1alpha1().Reservations()
reservationEventHandler := reservationutil.NewReservationToPodEventHandler(podEventHandler)
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), koordSharedInformerFactory, reservationInformer.Informer(), reservationEventHandler)
return pgMgr
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
pginformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1"

"github.com/koordinator-sh/koordinator/apis/extension"
koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
koordinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
)
Expand All @@ -52,7 +54,13 @@ func NewManagerForTest() *Mgr {

podClient := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(podClient, 0)
pgManager := NewPodGroupManager(pgClient, pgInformerFactory, informerFactory, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: 300 * time.Second}})

koordClient := koordfake.NewSimpleClientset()
koordInformerFactory := koordinformers.NewSharedInformerFactory(koordClient, 0)

args := &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: 300 * time.Second}}

pgManager := NewPodGroupManager(args, pgClient, pgInformerFactory, informerFactory, koordInformerFactory)
return &Mgr{
pgMgr: pgManager,
pgInformer: pgInformer,
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/validation"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/core"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
)
Expand Down Expand Up @@ -80,7 +81,10 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0)
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()

pgMgr := core.NewPodGroupManager(pgClient, pgInformerFactory, handle.SharedInformerFactory(), args)
informerFactory := handle.SharedInformerFactory()
extendedHandle := handle.(frameworkext.ExtendedHandle)
koordInformerFactory := extendedHandle.KoordinatorSharedInformerFactory()
pgMgr := core.NewPodGroupManager(args, pgClient, pgInformerFactory, informerFactory, koordInformerFactory)
plugin := &Coscheduling{
args: args,
frameworkHandler: handle,
Expand Down
27 changes: 24 additions & 3 deletions pkg/scheduler/plugins/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,42 @@ import (
fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake"

"github.com/koordinator-sh/koordinator/apis/extension"
koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
)

// gang test used
type PodGroupClientSetAndHandle struct {
framework.Handle
frameworkext.ExtendedHandle
pgclientset.Interface

koordInformerFactory koordinatorinformers.SharedInformerFactory
}

func (h *PodGroupClientSetAndHandle) KoordinatorSharedInformerFactory() koordinatorinformers.SharedInformerFactory {
return h.koordInformerFactory
}

func GangPluginFactoryProxy(clientSet pgclientset.Interface, factoryFn frameworkruntime.PluginFactory, plugin *framework.Plugin) frameworkruntime.PluginFactory {
return func(args apiruntime.Object, handle framework.Handle) (framework.Plugin, error) {
var err error
*plugin, err = factoryFn(args, PodGroupClientSetAndHandle{Handle: handle, Interface: clientSet})
koordClient := koordfake.NewSimpleClientset()
koordInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClient, 0)
extenderFactory, err := frameworkext.NewFrameworkExtenderFactory(
frameworkext.WithKoordinatorClientSet(koordClient),
frameworkext.WithKoordinatorSharedInformerFactory(koordInformerFactory))
if err != nil {
return nil, err
}
extender := extenderFactory.NewFrameworkExtender(handle.(framework.Framework))
*plugin, err = factoryFn(args, &PodGroupClientSetAndHandle{
ExtendedHandle: extender,
Interface: clientSet,
koordInformerFactory: koordInformerFactory,
})
return *plugin, err
}
}
Expand Down

0 comments on commit 9a87d5a

Please sign in to comment.