Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge based winuserspace proxy #45404

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 4 additions & 16 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,8 @@ type ProxyServer struct {
ResourceContainer string
ConfigSyncPeriod time.Duration
ServiceEventHandler proxyconfig.ServiceHandler
// TODO: Migrate all handlers to ServiceHandler types and
// get rid of this one.
ServiceHandler proxyconfig.ServiceConfigHandler
EndpointsEventHandler proxyconfig.EndpointsHandler
HealthzServer *healthcheck.HealthzServer
EndpointsEventHandler proxyconfig.EndpointsHandler
HealthzServer *healthcheck.HealthzServer
}

// createClients creates a kube client and an event client from the given config and masterOverride.
Expand Down Expand Up @@ -397,9 +394,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx

var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
// TODO: Migrate all handlers to ServiceHandler types and
// get rid of this one.
var serviceHandler proxyconfig.ServiceConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler

proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{})
Expand Down Expand Up @@ -455,7 +449,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
serviceHandler = proxierUserspace
serviceEventHandler = proxierUserspace
proxier = proxierUserspace
} else {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
Expand Down Expand Up @@ -517,7 +511,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
ServiceHandler: serviceHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
}, nil
Expand Down Expand Up @@ -621,12 +614,7 @@ func (s *ProxyServer) Run() error {
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
if s.ServiceHandler != nil {
serviceConfig.RegisterHandler(s.ServiceHandler)
}
if s.ServiceEventHandler != nil {
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
}
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
Expand Down
1 change: 0 additions & 1 deletion pkg/proxy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
Expand Down
76 changes: 0 additions & 76 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
Expand All @@ -30,22 +29,6 @@ import (
"k8s.io/kubernetes/pkg/controller"
)

// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
// DEPRECATED: Use ServiceHandler instead - this will be removed soon.
type ServiceConfigHandler interface {
// OnServiceUpdate gets called when a service is created, removed or changed
// on any of the configuration sources. An example is when a new service
// comes up.
//
// NOTE: For efficiency, services are being passed by reference, thus,
// OnServiceUpdate should NOT modify pointers of a given slice.
// Those service objects are shared with other layers of the system and
// are guaranteed to be immutable with the assumption that are also
// not mutated by those handlers. Make a deep copy if you need to modify
// them in your code.
OnServiceUpdate(services []*api.Service)
}

// ServiceHandler is an abstract interface of objects which receive
// notifications about service object changes.
type ServiceHandler interface {
Expand Down Expand Up @@ -185,24 +168,13 @@ type ServiceConfig struct {
lister listers.ServiceLister
listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
// TODO: Remove as soon as we migrate everything to event handlers.
handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers
updates chan struct{}
stop chan struct{}
}

// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
lister: serviceInformer.Lister(),
listerSynced: serviceInformer.Informer().HasSynced,
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates: make(chan struct{}, 1),
stop: make(chan struct{}),
}

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand All @@ -217,12 +189,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
return result
}

// RegisterHandler registers a handler which is called on every services change.
// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.handlers = append(c.handlers, handler)
}

// RegisterEventHandler registers a handler which is called on every service change.
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
Expand All @@ -240,40 +206,12 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
return
}

// We have synced informers. Now we can start delivering updates
// to the registered handler.
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
go func() {
defer utilruntime.HandleCrash()
for {
select {
case <-c.updates:
services, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isnt' any other trigger in-flight).
c.dispatchUpdate()
continue
}
if services == nil {
services = []*api.Service{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(services)
}
case <-c.stop:
return
}
}
}()

// Close updates channel when stopCh is closed.
<-stopCh
close(c.stop)
}

func (c *ServiceConfig) handleAddService(obj interface{}) {
Expand All @@ -286,7 +224,6 @@ func (c *ServiceConfig) handleAddService(obj interface{}) {
glog.V(4).Infof("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
c.dispatchUpdate()
}

func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
Expand All @@ -304,7 +241,6 @@ func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
glog.V(4).Infof("Calling handler.OnServiceUpdate")
c.eventHandlers[i].OnServiceUpdate(oldService, service)
}
c.dispatchUpdate()
}

func (c *ServiceConfig) handleDeleteService(obj interface{}) {
Expand All @@ -324,16 +260,4 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
glog.V(4).Infof("Calling handler.OnServiceDelete")
c.eventHandlers[i].OnServiceDelete(service)
}
c.dispatchUpdate()
}

func (c *ServiceConfig) dispatchUpdate() {
select {
case c.updates <- struct{}{}:
// Work enqueued successfully
case <-c.stop:
// We're shut down / avoid logging the message below
default:
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
}