Skip to content

Commit

Permalink
scheduler: migrate from legacy events API to new events API
Browse files Browse the repository at this point in the history
  • Loading branch information
SataQiu committed Jul 22, 2022
1 parent cab41bd commit 3de74c8
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 62 deletions.
6 changes: 3 additions & 3 deletions cmd/kube-scheduler/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)

Expand All @@ -46,8 +46,8 @@ type Config struct {
InformerFactory informers.SharedInformerFactory
DynInformerFactory dynamicinformer.DynamicSharedInformerFactory

//nolint:staticcheck // SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done.
EventBroadcaster events.EventBroadcasterAdapter
EventClient clientset.Interface
EventBroadcaster record.EventBroadcaster

// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
Expand Down
8 changes: 5 additions & 3 deletions cmd/kube-scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
clientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -283,7 +284,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
return nil, err
}

c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)
c.EventBroadcaster = record.NewBroadcaster()

// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
Expand All @@ -293,14 +294,15 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
if len(c.ComponentConfig.Profiles) != 0 {
schedulerName = c.ComponentConfig.Profiles[0].SchedulerName
}
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)
coreRecorder := c.EventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: schedulerName})
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder)
if err != nil {
return nil, err
}
}

c.Client = client
c.EventClient = eventClient
c.KubeConfig = kubeConfig
c.InformerFactory = scheduler.NewInformerFactory(client, 0)
dynClient := dynamic.NewForConfigOrDie(kubeConfig)
Expand Down
12 changes: 8 additions & 4 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/spf13/cobra"

v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator"
Expand All @@ -40,8 +41,10 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/configz"
Expand Down Expand Up @@ -157,7 +160,8 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}

// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
cc.EventBroadcaster.StartStructuredLogging(0)
cc.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.CoreV1().Events("")})
defer cc.EventBroadcaster.Shutdown()

// Setup healthz checks.
Expand Down Expand Up @@ -283,8 +287,8 @@ func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfig
}

func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
return func(name string) events.EventRecorder {
return cc.EventBroadcaster.NewRecorder(name)
return func(name string) record.EventRecorder {
return cc.EventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: name})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
)
Expand Down Expand Up @@ -609,7 +609,7 @@ type Handle interface {
KubeConfig() *restclient.Config

// EventRecorder returns an event recorder.
EventRecorder() events.EventRecorder
EventRecorder() record.EventRecorder

SharedInformerFactory() informers.SharedInformerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
kubeschedulerconfigv1beta2 "k8s.io/kube-scheduler/config/v1beta2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestPostFilter(t *testing.T) {
}
f, err := st.NewFramework(registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithEventRecorder(&record.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders),
Expand Down Expand Up @@ -1679,7 +1679,7 @@ func TestPreempt(t *testing.T) {
},
"",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithEventRecorder(&record.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
fh.EventRecorder().Eventf(victim, v1.EventTypeNormal, "Preempted", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())
}
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -89,7 +89,7 @@ type frameworkImpl struct {

clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder record.EventRecorder
informerFactory informers.SharedInformerFactory

metricsRecorder *metricsRecorder
Expand Down Expand Up @@ -137,7 +137,7 @@ type frameworkOptions struct {
componentConfigVersion string
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
eventRecorder record.EventRecorder
informerFactory informers.SharedInformerFactory
snapshotSharedLister framework.SharedLister
metricsRecorder *metricsRecorder
Expand Down Expand Up @@ -176,7 +176,7 @@ func WithKubeConfig(kubeConfig *restclient.Config) Option {
}

// WithEventRecorder sets clientSet for the scheduling frameworkImpl.
func WithEventRecorder(recorder events.EventRecorder) Option {
func WithEventRecorder(recorder record.EventRecorder) Option {
return func(o *frameworkOptions) {
o.eventRecorder = recorder
}
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (f *frameworkImpl) KubeConfig() *restclient.Config {
}

// EventRecorder returns an event recorder.
func (f *frameworkImpl) EventRecorder() events.EventRecorder {
func (f *frameworkImpl) EventRecorder() record.EventRecorder {
return f.eventRecorder
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/scheduler/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
"fmt"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)

// RecorderFactory builds an EventRecorder for a given scheduler name.
type RecorderFactory func(string) events.EventRecorder
type RecorderFactory func(string) record.EventRecorder

// newProfile builds a Profile for the given configuration.
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
Expand Down Expand Up @@ -70,9 +71,9 @@ func (m Map) HandlesSchedulerName(name string) bool {
}

// NewRecorderFactory returns a RecorderFactory for the broadcaster.
func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory {
return func(name string) events.EventRecorder {
return b.NewRecorder(scheme.Scheme, name)
func NewRecorderFactory(b record.EventBroadcaster) RecorderFactory {
return func(name string) record.EventRecorder {
return b.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/profile/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
Expand Down Expand Up @@ -284,7 +284,7 @@ func newFakePlugin(name string) func(object runtime.Object, handle framework.Han
}
}

func nilRecorderFactory(_ string) events.EventRecorder {
func nilRecorderFactory(_ string) record.EventRecorder {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
fwk.EventRecorder().Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).InfoS("Skip schedule deleting pod", "pod", klog.KObj(pod))
return true
}
Expand Down Expand Up @@ -796,7 +796,7 @@ func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod,
return
}

fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
fwk.EventRecorder().Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}

func getAttemptsLabel(p *framework.QueuedPodInfo) string {
Expand Down Expand Up @@ -873,7 +873,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
}

msg := truncateMessage(errMsg)
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
fwk.EventRecorder().Eventf(pod, v1.EventTypeWarning, "FailedScheduling", msg)
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Expand Down
Loading

0 comments on commit 3de74c8

Please sign in to comment.