Skip to content

Commit

Permalink
Migrate kubelet to use v1 Event API
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnhanx committed Feb 26, 2024
1 parent 689dca0 commit 25a9bd0
Show file tree
Hide file tree
Showing 82 changed files with 702 additions and 469 deletions.
8 changes: 4 additions & 4 deletions cmd/kubelet/app/server.go
Expand Up @@ -64,7 +64,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/certificate"
"k8s.io/client-go/util/connrotation"
Expand Down Expand Up @@ -528,12 +528,12 @@ func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, node
if kubeDeps.Recorder != nil {
return
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeDeps.KubeClient.EventsV1()})
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, componentKubelet)
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
eventBroadcaster.StartRecordingToSink(ctx.Done())
} else {
klog.InfoS("No api server defined - no events will be sent to API server")
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/nodeipam/node_ipam_controller.go
Expand Up @@ -25,10 +25,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
cloudprovider "k8s.io/cloud-provider"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -63,7 +62,7 @@ type Controller struct {
serviceCIDR *net.IPNet
secondaryServiceCIDR *net.IPNet
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventBroadcaster events.EventBroadcaster

nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
Expand Down Expand Up @@ -93,6 +92,8 @@ func NewNodeIpamController(
return nil, fmt.Errorf("kubeClient is nil when starting Controller")
}

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()})

// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
if allocatorType != ipam.CloudAllocatorType {
if len(clusterCIDRs) == 0 {
Expand All @@ -110,7 +111,7 @@ func NewNodeIpamController(
ic := &Controller{
cloud: cloud,
kubeClient: kubeClient,
eventBroadcaster: record.NewBroadcaster(),
eventBroadcaster: eventBroadcaster,
clusterCIDRs: clusterCIDRs,
serviceCIDR: serviceCIDR,
secondaryServiceCIDR: secondaryServiceCIDR,
Expand Down Expand Up @@ -150,9 +151,9 @@ func NewNodeIpamController(
func (nc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

// Start event processing pipeline.
// Start events processing pipeline.
nc.eventBroadcaster.StartStructuredLogging(0)
nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
nc.eventBroadcaster.StartRecordingToSink(ctx.Done())
defer nc.eventBroadcaster.Shutdown()
klog.FromContext(ctx).Info("Starting ipam controller")
defer klog.FromContext(ctx).Info("Shutting down ipam controller")
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Expand Up @@ -41,11 +41,10 @@ import (
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csitrans "k8s.io/csi-translation-lib"
Expand Down Expand Up @@ -151,8 +150,8 @@ func NewAttachDetachController(
return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err)
}

adc.broadcaster = record.NewBroadcaster()
recorder := adc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
adc.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()})
recorder := adc.broadcaster.NewRecorder(scheme.Scheme, "attachdetach-controller")
blkutil := volumepathhandler.NewBlockVolumePathHandler()

adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
Expand All @@ -178,6 +177,7 @@ func NewAttachDetachController(
adc.attacherDetacher,
adc.nodeStatusUpdater,
adc.nodeLister,
adc.pvcLister,
recorder)

csiTranslator := csitrans.New()
Expand Down Expand Up @@ -315,7 +315,7 @@ type attachDetachController struct {
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

// broadcaster is broadcasting events
broadcaster record.EventBroadcaster
broadcaster events.EventBroadcaster

// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
Expand All @@ -333,7 +333,7 @@ func (adc *attachDetachController) Run(ctx context.Context) {

// Start events processing pipeline.
adc.broadcaster.StartStructuredLogging(0)
adc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: adc.kubeClient.CoreV1().Events("")})
adc.broadcaster.StartRecordingToSink(ctx.Done())
defer adc.broadcaster.Shutdown()

logger := klog.FromContext(ctx)
Expand Down Expand Up @@ -905,7 +905,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName {
return ""
}

func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
func (adc *attachDetachController) GetEventRecorder() events.EventRecorder {
return nil
}

Expand Down
Expand Up @@ -25,7 +25,7 @@ import (
"fmt"
"sync"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
Expand Down
15 changes: 10 additions & 5 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
Expand Down Expand Up @@ -75,7 +75,8 @@ func NewReconciler(
attacherDetacher operationexecutor.OperationExecutor,
nodeStatusUpdater statusupdater.NodeStatusUpdater,
nodeLister corelisters.NodeLister,
recorder record.EventRecorder) Reconciler {
pvcLister corelisters.PersistentVolumeClaimLister,
recorder events.EventRecorder) Reconciler {
return &reconciler{
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
Expand All @@ -87,6 +88,7 @@ func NewReconciler(
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
nodeLister: nodeLister,
pvcLister: pvcLister,
timeOfLastSync: time.Now(),
recorder: recorder,
}
Expand All @@ -101,10 +103,11 @@ type reconciler struct {
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
nodeLister corelisters.NodeLister
pvcLister corelisters.PersistentVolumeClaimLister
timeOfLastSync time.Time
disableReconciliationSync bool
disableForceDetachOnTimeout bool
recorder record.EventRecorder
recorder events.EventRecorder
}

func (rc *reconciler) Run(ctx context.Context) {
Expand Down Expand Up @@ -387,11 +390,13 @@ func (rc *reconciler) reportMultiAttachError(logger klog.Logger, volumeToAttach

// Get list of pods that use the volume on the other nodes.
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
pvc := operationexecutor.GetPVCToAttach(volumeToAttach.VolumeToAttach, rc.pvcLister, nil)

if len(pods) == 0 {
// We did not find any pods that requests the volume. The pod must have been deleted already.
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
rc.recorder.Eventf(pod, pvc, v1.EventTypeWarning, kevents.FailedAttachVolume, "AttachingVolume", simpleMsg)
}
// Log detailed message to system admin
logger.Info("Multi-Attach error: volume is already exclusively attached and can't be attached to another node", "attachedTo", otherNodesStr, "volume", volumeToAttach)
Expand Down Expand Up @@ -426,7 +431,7 @@ func (rc *reconciler) reportMultiAttachError(logger klog.Logger, volumeToAttach
msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
}
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
rc.recorder.Eventf(scheduledPod, pvc, v1.EventTypeWarning, kevents.FailedAttachVolume, "AttachingVolume", simpleMsg)
}

// Log all pods for system admin
Expand Down

0 comments on commit 25a9bd0

Please sign in to comment.