Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge-based userspace LB in kube-proxy #43702

Merged
merged 3 commits into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err

var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler
// TODO: Migrate all handlers to EndpointsHandler type and
// get rid of this one.
var endpointsHandler proxyconfig.EndpointsConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler

proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
if proxyMode == proxyModeIPTables {
Expand All @@ -247,7 +250,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
}
proxier = proxierIPTables
servicesHandler = proxierIPTables
endpointsHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)
Expand All @@ -257,7 +260,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
// set EndpointsHandler to our loadBalancer
endpointsHandler = loadBalancer
proxierUserspace, err := winuserspace.NewProxier(
loadBalancer,
Expand All @@ -278,7 +281,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer
endpointsEventHandler = loadBalancer
proxierUserspace, err := userspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
Expand Down Expand Up @@ -318,7 +321,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
endpointsConfig.RegisterHandler(endpointsHandler)
if endpointsHandler != nil {
endpointsConfig.RegisterHandler(endpointsHandler)
}
if endpointsEventHandler != nil {
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
}
go endpointsConfig.Run(wait.NeverStop)

// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
Expand Down
79 changes: 73 additions & 6 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,31 @@ type EndpointsConfigHandler interface {
OnEndpointsUpdate(endpoints []*api.Endpoints)
}

// EndpointsHandler is an abstract interface o objects which receive
// notifications about endpoints object changes.
type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
OnEndpointsAdd(endpoints *api.Endpoints)
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
// OnEndpointsDelete is called whever deletion of an existing endpoints
// object is observed.
OnEndpointsDelete(endpoints *api.Endpoints)
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnEndpointsSynced()
}

// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
lister listers.EndpointsLister
listerSynced cache.InformerSynced
handlers []EndpointsConfigHandler
lister listers.EndpointsLister
listerSynced cache.InformerSynced
eventHandlers []EndpointsHandler
// TODO: Remove handlers by switching them to eventHandlers.
handlers []EndpointsConfigHandler
// updates channel is used to trigger registered handlers.
updates chan struct{}
stop chan struct{}
Expand Down Expand Up @@ -101,6 +120,11 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.handlers = append(c.handlers, handler)
}

// RegisterEventHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}

// Run starts the goroutine responsible for calling registered handlers.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
Expand All @@ -111,6 +135,10 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
c.eventHandlers[i].OnEndpointsSynced()
}
for {
select {
case <-c.updates:
Expand Down Expand Up @@ -140,15 +168,54 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
}()
}

func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) {
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsAdd")
c.eventHandlers[i].OnEndpointsAdd(endpoints)
}
c.dispatchUpdate()
}

func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) {
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
}
c.dispatchUpdate()
}

func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if endpoints, ok = tombstone.Obj.(*api.Endpoints); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsDelete(endpoints)
}
c.dispatchUpdate()
}

Expand Down
58 changes: 43 additions & 15 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
return info
}

type endpointsMap map[types.NamespacedName]*api.Endpoints
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo

type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo

// Proxier is an iptables based proxy for connections between a localhost:lport
Expand All @@ -210,9 +210,14 @@ type Proxier struct {
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
// nil until we have seen an On*Update event.
allServices []*api.Service
allEndpoints []*api.Endpoints
allEndpoints endpointsMap
// allServices is nil until we have seen an OnServiceUpdate event.
allServices []*api.Service

// endpointsSynced is set to true when endpoints are synced after startup.
// This is used to avoid updating iptables with some partial data after
// kube-proxy restart.
endpointsSynced bool

throttle flowcontrol.RateLimiter

Expand Down Expand Up @@ -327,6 +332,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
Expand Down Expand Up @@ -531,28 +537,51 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
proxier.syncProxyRules(syncReasonServices)
}

// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}

proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.allEndpoints == nil {
glog.V(2).Info("Received first Endpoints update")
}
proxier.allEndpoints = allEndpoints
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}

func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}

proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}

func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}

proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allEndpoints, namespacedName)
proxier.syncProxyRules(syncReasonEndpoints)
}

func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
proxier.syncProxyRules(syncReasonEndpoints)
}

// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {

// return values
newMap = make(proxyEndpointMap)
hcEndpoints = make(map[types.NamespacedName]int)
staleSet = make(map[endpointServicePair]bool)

// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(allEndpoints[i], hostname, &newMap)
for _, endpoints := range allEndpoints {
accumulateEndpointsMap(endpoints, hostname, &newMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
Expand Down Expand Up @@ -607,7 +636,6 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
// NOTE: endpoints object should NOT be modified.
//
// TODO: this could be simplified:
// - hostPortInfo and endpointsInfo overlap too much
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
Expand Down Expand Up @@ -732,7 +760,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if proxier.allEndpoints == nil || proxier.allServices == nil {
if !proxier.endpointsSynced || proxier.allServices == nil {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
Expand Down