Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch kube-proxy to informers & save 2/3 of cpu & memory of non-iptables related code. #42108

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kubemark/hollow_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewHollowProxyOrDie(
}
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
30*time.Second,
15*time.Minute,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasoning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, missed it. Ok.

serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/proxy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
],
)
Expand All @@ -38,6 +40,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/tools/cache",
],
Expand Down
149 changes: 119 additions & 30 deletions pkg/proxy/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,148 @@ limitations under the License.
package config

import (
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
)

// NewSourceAPI creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
stopCh := wait.NeverStop

servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()
serviceController := NewServiceController(servicesLW, period, servicesChan)
go serviceController.Run(stopCh)

endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan)
go endpointsController.Run(stopCh)

if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) {
utilruntime.HandleError(fmt.Errorf("source controllers not synced"))
}
}

// NewServiceStore creates an undelta store that expands updates to the store into
// ServiceUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
fn := func(objs []interface{}) {
var services []api.Service
for _, o := range objs {
services = append(services, *(o.(*api.Service)))
func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj))
return
}
ch <- ServiceUpdate{Op: SET, Services: services}
servicesChan <- ServiceUpdate{Op: ADD, Service: service}
}
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}

func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj))
return
}
servicesChan <- ServiceUpdate{Op: UPDATE, Service: service}
}
return &cache.UndeltaStore{
Store: store,
PushFunc: fn,
}

func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
var service *api.Service
switch t := obj.(type) {
case *api.Service:
service = t
case cache.DeletedFinalStateUnknown:
var ok bool
service, ok = t.Obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t))
return
}
servicesChan <- ServiceUpdate{Op: REMOVE, Service: service}
}
}

// NewEndpointsStore creates an undelta store that expands updates to the store into
// EndpointsUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store {
fn := func(objs []interface{}) {
var endpoints []api.Endpoints
for _, o := range objs {
endpoints = append(endpoints, *(o.(*api.Endpoints)))
func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints}
}
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}

func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj))
return
}
endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints}
}
return &cache.UndeltaStore{
Store: store,
PushFunc: fn,
}

func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
var endpoints *api.Endpoints
switch t := obj.(type) {
case *api.Endpoints:
endpoints = t
case cache.DeletedFinalStateUnknown:
var ok bool
endpoints, ok = t.Obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it our belief that this does not happen?

Because we do have the Key, which does have the Name & Namespace, which is all we need. We even have SplitMetaNamespaceKey which splits them (and a TODO to replace the whole string-packing in go2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaFIFO is not required to return a valid object on delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this code should cover both paths as Justin noted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. I think the only it can return is the cache.DeletedFinalStateUnknown.

I'm 100% sure that if this is not true, then we have at least 10+ places in the code which are not handling this one correctly. And a bunch of other different controllers won't be working correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what I wanted to say is basically - the case of unknown final state this is exactly:
"cache.DeletedFinalStateUnknown" what it returned here and it contains Obj which is a correct api.Service (or whatever we are watching), contains correct Name and Namespace etc., but may potentially contain state state of the object.

Which means that I'm handling this case (exactly the same is it is handled in other places).

return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints}
}
}

// NewServiceController creates a controller that is watching services and sending
// updates into ServiceUpdate channel.
func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller {
_, serviceController := cache.NewInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddService(ch),
UpdateFunc: sendUpdateService(ch),
DeleteFunc: sendDeleteService(ch),
},
)
return serviceController
}

// NewEndpointsController creates a controller that is watching endpoints and sending
// updates into EndpointsUpdate channel.
func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller {
_, endpointsController := cache.NewInformer(
lw,
&api.Endpoints{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddEndpoints(ch),
UpdateFunc: sendUpdateEndpoints(ch),
DeleteFunc: sendDeleteEndpoints(ch),
},
)
return endpointsController
}
69 changes: 25 additions & 44 deletions pkg/proxy/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -63,24 +64,16 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {

ch := make(chan ServiceUpdate)

cache.NewReflector(lw, &api.Service{}, NewServiceStore(nil, ch), 30*time.Second).Run()

got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := ServiceUpdate{Op: SET, Services: []api.Service{}}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
serviceController := NewServiceController(lw, 30*time.Second, ch)
go serviceController.Run(wait.NeverStop)

// Add the first service
fakeWatch.Add(service1v1)
got, ok = <-ch
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service1v1}}
expected := ServiceUpdate{Op: ADD, Service: service1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
Expand All @@ -92,11 +85,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := ServiceUpdate{Op: SET, Services: []api.Service{*service1v1, *service2}}
expectedB := ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v1}}
expected = ServiceUpdate{Op: ADD, Service: service2}

if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}

// Modify service1
Expand All @@ -105,11 +97,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expectedA = ServiceUpdate{Op: SET, Services: []api.Service{*service1v2, *service2}}
expectedB = ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v2}}
expected = ServiceUpdate{Op: UPDATE, Service: service1v2}

if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}

// Delete service1
Expand All @@ -118,7 +109,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service2}}
expected = ServiceUpdate{Op: REMOVE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
Expand All @@ -129,7 +120,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{}}
expected = ServiceUpdate{Op: REMOVE, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
Expand Down Expand Up @@ -174,24 +165,16 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {

ch := make(chan EndpointsUpdate)

cache.NewReflector(lw, &api.Endpoints{}, NewEndpointsStore(nil, ch), 30*time.Second).Run()

got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
endpointsController := NewEndpointsController(lw, 30*time.Second, ch)
go endpointsController.Run(wait.NeverStop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this used to be meaningful, in that the first update was a "clear" if there were no endpoints. It would be good to verify that we have at least one test in this group that covers - "start, have empty state, get endpoints, delete endpoints, have empty state" as well as "have state, resync, add / remove endpoints, have same state as before, but never have empty state". Is there a test like that already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this used to be meaningful, in that the first update was a "clear" if there were no endpoints

Sorry - don't understand this. Can you please clarify?

But anyway, I agree that previously "SET" was handled differently than "ADD", because:

  • ADD was used only in unit tests (i.e. production code was never issuing ADD) - it was always taking all contents from the underlying store and calling "SET" (meaning "replace") from those contents (no matter if it was add, update or remove).

However, with the controller framework, "REPLACE" is not an external concept.
If we are doing relist, controller internally is handling it - it is computing the difference and sending appropriate Add, Update and Remove calls if needed to match the desired state.
The same for resync (though since resync is called from in-memory state, there are pretty much only update handlers called).

That means, that since those are internal details of controller framework, we don't have a good was for forcing it. Also, those are already tested at the level of Informer, so testing framework internals here doesn't sound like a good idea to me.

But if you don't agree, can you please clarify what exactly you want to be tested, since I'm not sure I understood it correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to be sure that

  1. Kube-proxy doesn't do anything incorrect until initial sync
  2. Resync doesn't reset state of the proxy to "no services"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think by #1 here you mean "Make sure that the List has fully completed before we do anything with the state", and a big +1 on that... I think we've seen that bug in a few controllers. But ... did this bug also exist previously (if not, can someone point me to magic where it was avoided?)

(Also in general, if anyone has a good doc on Informers vs Reflectors vs direct-watching. I prefer direct-watching, but I guess that means I have all sort of edge cases in my code.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with 1 - (though us @justinsb wrote, I think it also wasn't handled correctly before).
Will add on Monday morning.

Regarding 2 - this is covered by Informer (DeltaFifo) tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So regarding one - it seems it's not easy to handle (and this is not the regression).
Basically, both before and now, with the initial "LIST" operation (of the underlying reflector), we are sending the ADD events from the results of it one by one, and then passing to proxier.

We need a better way to handle it, but since this is not a regression, I would prefer fixing it in a separate PR (I can try sending it later today).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine


// Add the first endpoints
fakeWatch.Add(endpoints1v1)
got, ok = <-ch
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1}}
expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
Expand All @@ -203,11 +186,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1, *endpoints2}}
expectedB := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v1}}
expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2}

if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}

// Modify endpoints1
Expand All @@ -216,11 +198,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expectedA = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v2, *endpoints2}}
expectedB = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v2}}
expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2}

if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}

// Delete endpoints1
Expand All @@ -229,7 +210,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2}}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
Expand All @@ -240,7 +221,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
Expand Down