Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge based services in proxy #44499

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 13 additions & 5 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})

var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler
var serviceEventHandler proxyconfig.ServiceHandler
// TODO: Migrate all handlers to ServiceHandler types and
// get rid of this one.
var serviceHandler proxyconfig.ServiceConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler

proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
Expand All @@ -246,7 +249,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
glog.Fatalf("Unable to create proxier: %v", err)
}
proxier = proxierIPTables
servicesHandler = proxierIPTables
serviceEventHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
Expand All @@ -271,7 +274,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
servicesHandler = proxierUserspace
serviceHandler = proxierUserspace
proxier = proxierUserspace
} else {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
Expand All @@ -292,7 +295,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
servicesHandler = proxierUserspace
serviceHandler = proxierUserspace
proxier = proxierUserspace
}
// Remove artifacts from the pure-iptables Proxier, if not on Windows.
Expand All @@ -314,7 +317,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod)
serviceConfig.RegisterHandler(servicesHandler)
if serviceHandler != nil {
serviceConfig.RegisterHandler(serviceHandler)
}
if serviceEventHandler != nil {
serviceConfig.RegisterEventHandler(serviceEventHandler)
}
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
Expand Down
1 change: 0 additions & 1 deletion pkg/proxy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/config:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
Expand Down
48 changes: 18 additions & 30 deletions pkg/proxy/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package config

import (
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -51,40 +50,34 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

ch := make(chan struct{})
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
handler := NewServiceHandlerMock()

sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)

serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
serviceConfig.RegisterHandler(handler)
serviceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
go serviceConfig.Run(stopCh)

// Add the first service
handler.expected = []*api.Service{service1v1}
fakeWatch.Add(service1v1)
<-ch
handler.ValidateServices(t, []*api.Service{service1v1})

// Add another service
handler.expected = []*api.Service{service1v1, service2}
fakeWatch.Add(service2)
<-ch
handler.ValidateServices(t, []*api.Service{service1v1, service2})

// Modify service1
handler.expected = []*api.Service{service1v2, service2}
fakeWatch.Modify(service1v2)
<-ch
handler.ValidateServices(t, []*api.Service{service1v2, service2})

// Delete service1
handler.expected = []*api.Service{service2}
fakeWatch.Delete(service1v2)
<-ch
handler.ValidateServices(t, []*api.Service{service2})

// Delete service2
handler.expected = []*api.Service{}
fakeWatch.Delete(service2)
<-ch
handler.ValidateServices(t, []*api.Service{})
}

func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
Expand Down Expand Up @@ -155,22 +148,17 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
handler.ValidateEndpoints(t, []*api.Endpoints{})
}

type svcHandler struct {
t *testing.T
expected []*api.Service
done func()
}

func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler {
return &svcHandler{t: t, expected: svcs, done: done}
}

func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
defer s.done()
sort.Sort(sortedServices(services))
if !reflect.DeepEqual(s.expected, services) {
s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected)
func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) ServiceHandler {
shm := &ServiceHandlerMock{
state: make(map[types.NamespacedName]*api.Service),
}
shm.process = func(services []*api.Service) {
defer done()
if !reflect.DeepEqual(services, svcs) {
t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
}
}
return shm
}

func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
Expand Down Expand Up @@ -213,7 +201,7 @@ func TestInitialSync(t *testing.T) {
svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0)
epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0)
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
svcConfig.RegisterEventHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterEventHandler(epsHandler)

Expand Down
102 changes: 79 additions & 23 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/config"
)

// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
// DEPRECATED: Use ServiceHandler instead - this will be removed soon.
type ServiceConfigHandler interface {
// OnServiceUpdate gets called when a service is created, removed or changed
// on any of the configuration sources. An example is when a new service
Expand All @@ -46,7 +46,24 @@ type ServiceConfigHandler interface {
OnServiceUpdate(services []*api.Service)
}

// EndpointsHandler is an abstract interface o objects which receive
// ServiceHandler is an abstract interface of objects which receive
// notifications about service object changes.
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *api.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *api.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *api.Service)
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}

// EndpointsHandler is an abstract interface of objects which receive
// notifications about endpoints object changes.
type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object
Expand Down Expand Up @@ -157,17 +174,19 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
}
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
glog.V(4).Infof("Calling handler.OnEndpointsDelete")
c.eventHandlers[i].OnEndpointsDelete(endpoints)
}
}

// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
lister listers.ServiceLister
listerSynced cache.InformerSynced
handlers []ServiceConfigHandler
lister listers.ServiceLister
listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
// TODO: Remove as soon as we migrate everything to event handlers.
handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers
updates chan struct{}
stop chan struct{}
Expand Down Expand Up @@ -199,10 +218,16 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
}

// RegisterHandler registers a handler which is called on every services change.
// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.handlers = append(c.handlers, handler)
}

// RegisterEventHandler registers a handler which is called on every service change.
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}

// Run starts the goroutine responsible for calling
// registered handlers.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
Expand All @@ -217,6 +242,10 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {

// We have synced informers. Now we can start delivering updates
// to the registered handler.
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
go func() {
defer utilruntime.HandleCrash()
for {
Expand All @@ -241,24 +270,60 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
}
}
}()
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.stop)
}()

// Close updates channel when stopCh is closed.
<-stopCh
close(c.stop)
}

func (c *ServiceConfig) handleAddService(_ interface{}) {
func (c *ServiceConfig) handleAddService(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
c.dispatchUpdate()
}

func (c *ServiceConfig) handleUpdateService(_, _ interface{}) {
func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
oldService, ok := oldObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceUpdate")
c.eventHandlers[i].OnServiceUpdate(oldService, service)
}
c.dispatchUpdate()
}

func (c *ServiceConfig) handleDeleteService(_ interface{}) {
func (c *ServiceConfig) handleDeleteService(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if service, ok = tombstone.Obj.(*api.Service); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceDelete")
c.eventHandlers[i].OnServiceDelete(service)
}
c.dispatchUpdate()
}

Expand All @@ -272,12 +337,3 @@ func (c *ServiceConfig) dispatchUpdate() {
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
}

// watchForUpdates invokes bcaster.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
for true {
<-updates
bcaster.Notify(accessor.MergedState())
}
}