Skip to content

Commit

Permalink
Fix event broadcaster shutdown in multiple controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed May 17, 2022
1 parent 81261d4 commit 11b679c
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 40 deletions.
23 changes: 15 additions & 8 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
// DaemonSetsController is responsible for synchronizing DaemonSet objects stored
// in the system with actual running pods.
type DaemonSetsController struct {
kubeClient clientset.Interface
eventRecorder record.EventRecorder
podControl controller.PodControlInterface
crControl controller.ControllerRevisionControlInterface
kubeClient clientset.Interface

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

podControl controller.PodControlInterface
crControl controller.ControllerRevisionControlInterface

// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
Expand Down Expand Up @@ -138,17 +141,16 @@ func NewDaemonSetsController(
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
Expand Down Expand Up @@ -279,6 +281,11 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
// Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

dsc.eventBroadcaster.StartStructuredLogging(0)
dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
defer dsc.eventBroadcaster.Shutdown()

defer dsc.queue.ShutDown()

klog.Infof("Starting daemon sets controller")
Expand Down
23 changes: 15 additions & 8 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
// in the system with actual running replica sets and pods.
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
rsControl controller.RSControlInterface
client clientset.Interface

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

// To allow injection of syncDeployment for testing.
syncHandler func(ctx context.Context, dKey string) error
Expand Down Expand Up @@ -100,18 +102,17 @@ type DeploymentController struct {
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
client: client,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Expand Down Expand Up @@ -148,6 +149,12 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(0)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()

defer dc.queue.ShutDown()

klog.InfoS("Starting controller", "controller", "deployment")
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type Controller struct {
// Orphan deleted pods that still have a Job tracking finalizer to be removed
orphanQueue workqueue.RateLimitingInterface

recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder

podUpdateBatchPeriod time.Duration
}
Expand All @@ -123,8 +124,6 @@ type Controller struct {
// in sync with their corresponding Job objects.
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
Expand All @@ -140,6 +139,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
Expand Down Expand Up @@ -178,6 +178,12 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
jm.broadcaster.StartStructuredLogging(0)
jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
defer jm.broadcaster.Shutdown()

defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown()

Expand Down
22 changes: 11 additions & 11 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type ReplicaSetController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface

eventBroadcaster record.EventBroadcaster

// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
Expand Down Expand Up @@ -117,8 +119,6 @@ type ReplicaSetController struct {
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if err := metrics.Register(legacyregistry.Register); err != nil {
klog.ErrorS(err, "unable to register metrics")
}
Expand All @@ -130,13 +130,14 @@ func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInf
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
eventBroadcaster,
)
}

// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
Expand All @@ -145,6 +146,7 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
eventBroadcaster: eventBroadcaster,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
Expand Down Expand Up @@ -188,17 +190,15 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
return rsc
}

// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
}

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
rsc.eventBroadcaster.StartStructuredLogging(0)
rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
defer rsc.eventBroadcaster.Shutdown()

defer rsc.queue.ShutDown()

controllerName := strings.ToLower(rsc.Kind)
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/replicaset"
Expand All @@ -50,8 +49,6 @@ type ReplicationManager struct {
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
v1.SchemeGroupVersion.WithKind("ReplicationController"),
Expand All @@ -61,6 +58,7 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
eventBroadcaster,
),
}
}
13 changes: 10 additions & 3 deletions pkg/controller/statefulset/stateful_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type StatefulSetController struct {
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
// eventBroadcaster is the core of event processing pipeline.
eventBroadcaster record.EventBroadcaster
}

// NewStatefulSetController creates a new statefulset controller.
Expand All @@ -85,8 +87,6 @@ func NewStatefulSetController(
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
Expand All @@ -101,10 +101,11 @@ func NewStatefulSetController(
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

revListerSynced: revInformer.Informer().HasSynced,
eventBroadcaster: eventBroadcaster,
}

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -142,6 +143,12 @@ func NewStatefulSetController(
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
ssc.eventBroadcaster.StartStructuredLogging(0)
ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
defer ssc.eventBroadcaster.Shutdown()

defer ssc.queue.ShutDown()

klog.Infof("Starting stateful set controller")
Expand Down
4 changes: 0 additions & 4 deletions test/integration/quota/quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
Expand Down Expand Up @@ -102,7 +101,6 @@ func TestQuota(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)

discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
Expand Down Expand Up @@ -333,7 +331,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)

discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
Expand Down Expand Up @@ -462,7 +459,6 @@ func TestQuotaLimitService(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)

discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
Expand Down

0 comments on commit 11b679c

Please sign in to comment.