Skip to content

Commit

Permalink
Migrate kubelet to use v1 Event API
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnhanx committed Mar 4, 2024
1 parent 6d2ee13 commit d53413d
Show file tree
Hide file tree
Showing 81 changed files with 554 additions and 463 deletions.
8 changes: 4 additions & 4 deletions cmd/kubelet/app/server.go
Expand Up @@ -64,7 +64,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/certificate"
"k8s.io/client-go/util/connrotation"
Expand Down Expand Up @@ -528,12 +528,12 @@ func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, node
if kubeDeps.Recorder != nil {
return
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeDeps.KubeClient.EventsV1()})
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, componentKubelet)
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
eventBroadcaster.StartRecordingToSink(ctx.Done())
} else {
klog.InfoS("No api server defined - no events will be sent to API server")
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/nodeipam/node_ipam_controller.go
Expand Up @@ -25,10 +25,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
cloudprovider "k8s.io/cloud-provider"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -63,7 +62,7 @@ type Controller struct {
serviceCIDR *net.IPNet
secondaryServiceCIDR *net.IPNet
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventBroadcaster events.EventBroadcaster

nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
Expand Down Expand Up @@ -92,6 +91,8 @@ func NewNodeIpamController(
return nil, fmt.Errorf("kubeClient is nil when starting Controller")
}

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()})

// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
if allocatorType != ipam.CloudAllocatorType {
if len(clusterCIDRs) == 0 {
Expand All @@ -109,7 +110,7 @@ func NewNodeIpamController(
ic := &Controller{
cloud: cloud,
kubeClient: kubeClient,
eventBroadcaster: record.NewBroadcaster(record.WithContext(ctx)),
eventBroadcaster: eventBroadcaster,
clusterCIDRs: clusterCIDRs,
serviceCIDR: serviceCIDR,
secondaryServiceCIDR: secondaryServiceCIDR,
Expand Down Expand Up @@ -149,9 +150,9 @@ func NewNodeIpamController(
func (nc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

// Start event processing pipeline.
// Start events processing pipeline.
nc.eventBroadcaster.StartStructuredLogging(3)
nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
nc.eventBroadcaster.StartRecordingToSink(ctx.Done())
defer nc.eventBroadcaster.Shutdown()
klog.FromContext(ctx).Info("Starting ipam controller")
defer klog.FromContext(ctx).Info("Shutting down ipam controller")
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Expand Up @@ -41,11 +41,10 @@ import (
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csitrans "k8s.io/csi-translation-lib"
Expand Down Expand Up @@ -153,8 +152,8 @@ func NewAttachDetachController(
return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err)
}

adc.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
recorder := adc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
adc.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()})
recorder := adc.broadcaster.NewRecorder(scheme.Scheme, "attachdetach-controller")
blkutil := volumepathhandler.NewBlockVolumePathHandler()

adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
Expand All @@ -180,6 +179,7 @@ func NewAttachDetachController(
adc.attacherDetacher,
adc.nodeStatusUpdater,
adc.nodeLister,
adc.pvcLister,
recorder)

csiTranslator := csitrans.New()
Expand Down Expand Up @@ -317,7 +317,7 @@ type attachDetachController struct {
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

// broadcaster is broadcasting events
broadcaster record.EventBroadcaster
broadcaster events.EventBroadcaster

// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
Expand All @@ -335,7 +335,7 @@ func (adc *attachDetachController) Run(ctx context.Context) {

// Start events processing pipeline.
adc.broadcaster.StartStructuredLogging(3)
adc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: adc.kubeClient.CoreV1().Events("")})
adc.broadcaster.StartRecordingToSink(ctx.Done())
defer adc.broadcaster.Shutdown()

logger := klog.FromContext(ctx)
Expand Down Expand Up @@ -907,7 +907,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName {
return ""
}

func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
func (adc *attachDetachController) GetEventRecorder() events.EventRecorder {
return nil
}

Expand Down
Expand Up @@ -25,7 +25,7 @@ import (
"fmt"
"sync"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
Expand Down
15 changes: 10 additions & 5 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
Expand Down Expand Up @@ -75,7 +75,8 @@ func NewReconciler(
attacherDetacher operationexecutor.OperationExecutor,
nodeStatusUpdater statusupdater.NodeStatusUpdater,
nodeLister corelisters.NodeLister,
recorder record.EventRecorder) Reconciler {
pvcLister corelisters.PersistentVolumeClaimLister,
recorder events.EventRecorder) Reconciler {
return &reconciler{
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
Expand All @@ -87,6 +88,7 @@ func NewReconciler(
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
nodeLister: nodeLister,
pvcLister: pvcLister,
timeOfLastSync: time.Now(),
recorder: recorder,
}
Expand All @@ -101,10 +103,11 @@ type reconciler struct {
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
nodeLister corelisters.NodeLister
pvcLister corelisters.PersistentVolumeClaimLister
timeOfLastSync time.Time
disableReconciliationSync bool
disableForceDetachOnTimeout bool
recorder record.EventRecorder
recorder events.EventRecorder
}

func (rc *reconciler) Run(ctx context.Context) {
Expand Down Expand Up @@ -387,11 +390,13 @@ func (rc *reconciler) reportMultiAttachError(logger klog.Logger, volumeToAttach

// Get list of pods that use the volume on the other nodes.
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
pvc := operationexecutor.GetPVCToAttach(volumeToAttach.VolumeToAttach, rc.pvcLister, nil)

if len(pods) == 0 {
// We did not find any pods that requests the volume. The pod must have been deleted already.
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
rc.recorder.Eventf(pod, pvc, v1.EventTypeWarning, kevents.FailedAttachVolume, "AttachingVolume", simpleMsg)
}
// Log detailed message to system admin
logger.Info("Multi-Attach error: volume is already exclusively attached and can't be attached to another node", "attachedTo", otherNodesStr, "volume", volumeToAttach)
Expand Down Expand Up @@ -426,7 +431,7 @@ func (rc *reconciler) reportMultiAttachError(logger klog.Logger, volumeToAttach
msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
}
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
rc.recorder.Eventf(scheduledPod, pvc, v1.EventTypeWarning, kevents.FailedAttachVolume, "AttachingVolume", simpleMsg)
}

// Log all pods for system admin
Expand Down

0 comments on commit d53413d

Please sign in to comment.