Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert: enhancement(scheduler): share waitingPods among profiles #124001

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

waitingPods := frameworkruntime.NewWaitingPodsMap()

cache := internalcache.New(ctx, time.Duration(0))
for _, pod := range test.pods {
cache.AddPod(logger, pod)
Expand Down Expand Up @@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger),
)
if err != nil {
Expand Down
10 changes: 1 addition & 9 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type frameworkOptions struct {
extenders []framework.Extender
captureProfile CaptureProfile
parallelizer parallelize.Parallelizer
waitingPods *waitingPodsMap
logger *klog.Logger
}

Expand Down Expand Up @@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
}
}

// WithWaitingPods sets waitingPods for the scheduling frameworkImpl.
func WithWaitingPods(wp *waitingPodsMap) Option {
return func(o *frameworkOptions) {
o.waitingPods = wp
}
}

// WithLogger overrides the default logger from k8s.io/klog.
func WithLogger(logger klog.Logger) Option {
return func(o *frameworkOptions) {
Expand Down Expand Up @@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
scorePluginWeight: make(map[string]int),
waitingPods: options.waitingPods,
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
Expand Down
17 changes: 4 additions & 13 deletions pkg/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
Expand Down Expand Up @@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) {
SchedulerName: testProfileName,
Plugins: plugins,
}
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
withMetricsRecorder(recorder),
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
if err != nil {
cancel()
t.Fatalf("Failed to create framework for testing: %v", err)
Expand Down Expand Up @@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
Expand Down Expand Up @@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
WithWaitingPods(NewWaitingPodsMap()),
)
f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/runtime/waiting_pods_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type waitingPodsMap struct {
mu sync.RWMutex
}

// NewWaitingPodsMap returns a new waitingPodsMap.
func NewWaitingPodsMap() *waitingPodsMap {
// newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{
pods: make(map[types.UID]*waitingPod),
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
registerPluginFuncs,
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
)
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
}
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
waitingPods := frameworkruntime.NewWaitingPodsMap()

fwk, _ := tf.NewFramework(
ctx,
Expand All @@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithWaitingPods(waitingPods),
)

errChan := make(chan error, 1)
Expand Down
3 changes: 0 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ func New(ctx context.Context,

snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
waitingPods := frameworkruntime.NewWaitingPodsMap()

profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
Expand All @@ -305,7 +303,6 @@ func New(ctx context.Context,
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
frameworkruntime.WithWaitingPods(waitingPods),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
Expand Down