Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Controller Manager] Fix certificates controller hotloop and use utilruntime.HandleError to replace glog.Errorf #32664

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 27 additions & 18 deletions pkg/controller/certificates/controller.go
Expand Up @@ -58,7 +58,7 @@ type CertificateController struct {

signer *local.Signer

queue *workqueue.Type
queue workqueue.RateLimitingInterface
}

func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approveAllKubeletCSRsForGroup string) (*CertificateController, error) {
Expand All @@ -79,7 +79,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du

cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.NewNamed("certificate"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"),
signer: ca,
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}
Expand Down Expand Up @@ -121,37 +121,47 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()

go cc.csrController.Run(stopCh)

glog.Infof("Starting certificate controller manager")
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
cc.queue.ShutDown()
}

// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for {
func() {
key, quit := cc.queue.Get()
if quit {
return
}
defer cc.queue.Done(key)
err := cc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing CSR: %v", err)
}
}()
for cc.processNextWorkItem() {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)

err := cc.syncHandler(cKey.(string))
if err == nil {
cc.queue.Forget(cKey)
return true
}

cc.queue.AddRateLimited(cKey)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
return true
}

func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
cc.queue.Add(key)
Expand Down Expand Up @@ -180,7 +190,6 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
cc.queue.Add(key)
return err
}
if !exists {
Expand Down Expand Up @@ -235,7 +244,7 @@ func (cc *CertificateController) maybeAutoApproveCSR(csr *certificates.Certifica

x509cr, err := utilcertificates.ParseCertificateRequestObject(csr)
if err != nil {
glog.Errorf("unable to parse csr %q: %v", csr.ObjectMeta.Name, err)
utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err))
return csr, nil
}
if !reflect.DeepEqual([]string{"system:nodes"}, x509cr.Subject.Organization) {
Expand Down