Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use endpoints informer for the endpoint controller #47731

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/core.go
Expand Up @@ -178,6 +178,7 @@ func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize we weren't watching endpoints before - given that this controller is expected to know and mutate every endpoint, I think this is exactly what we should be doing. It will have a memory impact, but will drop a significant chunk of QPS from the apiservers.

@wojtek-t @gmarek may change our kube-mark profiles, but I would expect it to be in a positive way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QPS drops substantially in my testing. I also don't see spurious updates I was seeing before (during a resync, about 20% of the endpoints would cause a PUT even though nothing changed - not sure why).

ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/endpoint/BUILD
Expand Up @@ -16,6 +16,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1/endpoints:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
Expand Down Expand Up @@ -53,6 +54,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
Expand Down
39 changes: 29 additions & 10 deletions pkg/controller/endpoint/endpoints_controller.go
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1/endpoints"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand Down Expand Up @@ -69,13 +70,15 @@ var (
)

// NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController {
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController {
if client != nil && client.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter())
}
e := &EndpointController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -96,6 +99,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced

e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

return e
}

Expand All @@ -117,12 +123,22 @@ type EndpointController struct {
// Added as a member to the struct to allow injection for testing.
podsSynced cache.InformerSynced

// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
// NewEndpointController.
endpointsLister corelisters.EndpointsLister
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
endpointsSynced cache.InformerSynced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Did you mean to include this in the set supplied to controller.WaitForCacheSync in (*EndpointController).Run at line 150?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please wait for endpoint cache to sync. You don't want to make decisions on unintentionally blank data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I thought I had done it, will fix.


// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.RateLimitingInterface

// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
workerLoopPeriod time.Duration
}

// Runs e; will not return until stopCh is closed. workers determines how many
Expand All @@ -134,12 +150,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting endpoint controller")
defer glog.Infof("Shutting down endpoint controller")

if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) {
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(e.worker, time.Second, stopCh)
go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}

go func() {
Expand Down Expand Up @@ -413,7 +429,7 @@ func (e *EndpointController) syncService(key string) error {
subsets = endpoints.RepackSubsets(subsets)

// See if there's actually an update here.
currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{})
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want to deep-copy these below before you mutate them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. You need to do deep copy in line 432 (in case when you didn't enter if in line 415).

if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
Expand All @@ -432,7 +448,11 @@ func (e *EndpointController) syncService(key string) error {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints
copy, err := api.Scheme.DeepCopy(currentEndpoints)
if err != nil {
return err
}
newEndpoints := copy.(*v1.Endpoints)
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
Expand Down Expand Up @@ -468,13 +488,12 @@ func (e *EndpointController) syncService(key string) error {
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *EndpointController) checkLeftoverEndpoints() {
list, err := e.client.Core().Endpoints(metav1.NamespaceAll).List(metav1.ListOptions{})
list, err := e.endpointsLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for i := range list.Items {
ep := &list.Items[i]
for _, ep := range list {
if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
// when there are multiple controller-manager instances,
// we observe that it will delete leader-election endpoints after 5min
Expand Down