Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] scheduler: migrate from legacy events API to new events API #111345

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/kube-scheduler/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)

Expand All @@ -46,8 +46,8 @@ type Config struct {
InformerFactory informers.SharedInformerFactory
DynInformerFactory dynamicinformer.DynamicSharedInformerFactory

//nolint:staticcheck // SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done.
EventBroadcaster events.EventBroadcasterAdapter
EventClient clientset.Interface
EventBroadcaster record.EventBroadcaster

// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
Expand Down
8 changes: 5 additions & 3 deletions cmd/kube-scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
clientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -283,7 +284,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
return nil, err
}

c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)
c.EventBroadcaster = record.NewBroadcaster()

// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
Expand All @@ -293,14 +294,15 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
if len(c.ComponentConfig.Profiles) != 0 {
schedulerName = c.ComponentConfig.Profiles[0].SchedulerName
}
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)
coreRecorder := c.EventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: schedulerName})
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder)
if err != nil {
return nil, err
}
}

c.Client = client
c.EventClient = eventClient
c.KubeConfig = kubeConfig
c.InformerFactory = scheduler.NewInformerFactory(client, 0)
dynClient := dynamic.NewForConfigOrDie(kubeConfig)
Expand Down
12 changes: 8 additions & 4 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/spf13/cobra"

v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator"
Expand All @@ -40,8 +41,10 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/configz"
Expand Down Expand Up @@ -157,7 +160,8 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}

// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
cc.EventBroadcaster.StartStructuredLogging(0)
cc.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.CoreV1().Events("")})
defer cc.EventBroadcaster.Shutdown()

// Setup healthz checks.
Expand Down Expand Up @@ -283,8 +287,8 @@ func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfig
}

func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
return func(name string) events.EventRecorder {
return cc.EventBroadcaster.NewRecorder(name)
return func(name string) record.EventRecorder {
return cc.EventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: name})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
)
Expand Down Expand Up @@ -609,7 +609,7 @@ type Handle interface {
KubeConfig() *restclient.Config

// EventRecorder returns an event recorder.
EventRecorder() events.EventRecorder
EventRecorder() record.EventRecorder

SharedInformerFactory() informers.SharedInformerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
kubeschedulerconfigv1beta2 "k8s.io/kube-scheduler/config/v1beta2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestPostFilter(t *testing.T) {
}
f, err := st.NewFramework(registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithEventRecorder(&record.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders),
Expand Down Expand Up @@ -1679,7 +1679,7 @@ func TestPreempt(t *testing.T) {
},
"",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithEventRecorder(&record.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
fh.EventRecorder().Eventf(victim, v1.EventTypeNormal, "Preempted", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())
}
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -89,7 +89,7 @@ type frameworkImpl struct {

clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder record.EventRecorder
informerFactory informers.SharedInformerFactory

metricsRecorder *metricsRecorder
Expand Down Expand Up @@ -137,7 +137,7 @@ type frameworkOptions struct {
componentConfigVersion string
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder record.EventRecorder
informerFactory informers.SharedInformerFactory
snapshotSharedLister framework.SharedLister
metricsRecorder *metricsRecorder
Expand Down Expand Up @@ -176,7 +176,7 @@ func WithKubeConfig(kubeConfig *restclient.Config) Option {
}

// WithEventRecorder sets clientSet for the scheduling frameworkImpl.
func WithEventRecorder(recorder events.EventRecorder) Option {
func WithEventRecorder(recorder record.EventRecorder) Option {
return func(o *frameworkOptions) {
o.eventRecorder = recorder
}
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (f *frameworkImpl) KubeConfig() *restclient.Config {
}

// EventRecorder returns an event recorder.
func (f *frameworkImpl) EventRecorder() events.EventRecorder {
func (f *frameworkImpl) EventRecorder() record.EventRecorder {
return f.eventRecorder
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/scheduler/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
"fmt"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)

// RecorderFactory builds an EventRecorder for a given scheduler name.
type RecorderFactory func(string) events.EventRecorder
type RecorderFactory func(string) record.EventRecorder

// newProfile builds a Profile for the given configuration.
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
Expand Down Expand Up @@ -70,9 +71,9 @@ func (m Map) HandlesSchedulerName(name string) bool {
}

// NewRecorderFactory returns a RecorderFactory for the broadcaster.
func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory {
return func(name string) events.EventRecorder {
return b.NewRecorder(scheme.Scheme, name)
func NewRecorderFactory(b record.EventBroadcaster) RecorderFactory {
return func(name string) record.EventRecorder {
return b.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/profile/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
Expand Down Expand Up @@ -284,7 +284,7 @@ func newFakePlugin(name string) func(object runtime.Object, handle framework.Han
}
}

func nilRecorderFactory(_ string) events.EventRecorder {
func nilRecorderFactory(_ string) record.EventRecorder {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
fwk.EventRecorder().Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).InfoS("Skip schedule deleting pod", "pod", klog.KObj(pod))
return true
}
Expand Down Expand Up @@ -796,7 +796,7 @@ func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod,
return
}

fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
fwk.EventRecorder().Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}

func getAttemptsLabel(p *framework.QueuedPodInfo) string {
Expand Down Expand Up @@ -873,7 +873,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
}

msg := truncateMessage(errMsg)
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
fwk.EventRecorder().Eventf(pod, v1.EventTypeWarning, "FailedScheduling", msg)
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Expand Down
Loading