Skip to content

Commit

Permalink
Merge pull request #124001 from kerthcet/fix/multi-prifile
Browse files Browse the repository at this point in the history
Revert: enhancement(scheduler): share waitingPods among profiles
  • Loading branch information
k8s-ci-robot committed Mar 20, 2024
2 parents fe9e469 + 84750fe commit a309fad
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

waitingPods := frameworkruntime.NewWaitingPodsMap()

cache := internalcache.New(ctx, time.Duration(0))
for _, pod := range test.pods {
cache.AddPod(logger, pod)
Expand Down Expand Up @@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger),
)
if err != nil {
Expand Down
10 changes: 1 addition & 9 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type frameworkOptions struct {
extenders []framework.Extender
captureProfile CaptureProfile
parallelizer parallelize.Parallelizer
waitingPods *waitingPodsMap
logger *klog.Logger
}

Expand Down Expand Up @@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
}
}

// WithWaitingPods sets waitingPods for the scheduling frameworkImpl.
func WithWaitingPods(wp *waitingPodsMap) Option {
return func(o *frameworkOptions) {
o.waitingPods = wp
}
}

// WithLogger overrides the default logger from k8s.io/klog.
func WithLogger(logger klog.Logger) Option {
return func(o *frameworkOptions) {
Expand Down Expand Up @@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
scorePluginWeight: make(map[string]int),
waitingPods: options.waitingPods,
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
Expand Down
17 changes: 4 additions & 13 deletions pkg/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
Expand Down Expand Up @@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) {
SchedulerName: testProfileName,
Plugins: plugins,
}
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
withMetricsRecorder(recorder),
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
if err != nil {
cancel()
t.Fatalf("Failed to create framework for testing: %v", err)
Expand Down Expand Up @@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
Expand Down Expand Up @@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/runtime/waiting_pods_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type waitingPodsMap struct {
mu sync.RWMutex
}

// NewWaitingPodsMap returns a new waitingPodsMap.
func NewWaitingPodsMap() *waitingPodsMap {
// newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{
pods: make(map[types.UID]*waitingPod),
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
registerPluginFuncs,
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
)
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
}
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
waitingPods := frameworkruntime.NewWaitingPodsMap()

fwk, _ := tf.NewFramework(
ctx,
Expand All @@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithWaitingPods(waitingPods),
)

errChan := make(chan error, 1)
Expand Down
3 changes: 0 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ func New(ctx context.Context,

snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
waitingPods := frameworkruntime.NewWaitingPodsMap()

profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
Expand All @@ -305,7 +303,6 @@ func New(ctx context.Context,
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
frameworkruntime.WithWaitingPods(waitingPods),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
Expand Down

0 comments on commit a309fad

Please sign in to comment.