Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't rebuild endpoints map in iptables kube-proxy all the time. #44494

Merged
merged 1 commit into from
Apr 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/proxy/iptables/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

Expand Down
214 changes: 136 additions & 78 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,44 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
return info
}

type endpointsMap map[types.NamespacedName]*api.Endpoints
type endpointsChange struct {
previous *api.Endpoints
current *api.Endpoints
}
type endpointsChangeMap map[types.NamespacedName]*endpointsChange
type serviceMap map[types.NamespacedName]*api.Service
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo

func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
for svcPort := range other {
em[svcPort] = other[svcPort]
}
}

func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
for svcPort := range other {
delete(em, svcPort)
}
}

// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap proxyEndpointMap
portsMap map[localPort]closeable
// allServices and allEndpoints should never be modified by proxier - the
endpointsMap proxyEndpointsMap
// endpointsChanges contains all changes to endpoints that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
endpointsChanges endpointsChangeMap
portsMap map[localPort]closeable
// allServices should never be modified by proxier - the
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
allEndpoints endpointsMap
allServices serviceMap
allServices serviceMap

// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
Expand Down Expand Up @@ -330,24 +350,24 @@ func NewProxier(ipt utiliptables.Interface,
}

return &Proxier{
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap),
allServices: make(serviceMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: make(endpointsChangeMap),
portsMap: make(map[localPort]closeable),
allServices: make(serviceMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
}, nil
}

Expand Down Expand Up @@ -566,16 +586,32 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {

proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints

change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = nil
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints

proxier.syncProxyRules(syncReasonEndpoints)
}

func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) {
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}

proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints

change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = oldEndpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints

proxier.syncProxyRules(syncReasonEndpoints)
}

Expand All @@ -584,7 +620,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {

proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allEndpoints, namespacedName)

change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = endpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = nil

proxier.syncProxyRules(syncReasonEndpoints)
}

Expand All @@ -595,45 +639,65 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.syncProxyRules(syncReasonEndpoints)
}

// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {

// return values
newMap = make(proxyEndpointMap)
hcEndpoints = make(map[types.NamespacedName]int)
// <endpointsMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
syncRequired = false
staleSet = make(map[endpointServicePair]bool)
for _, change := range *changes {
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
endpointsMap.unmerge(oldEndpointsMap)
endpointsMap.merge(newEndpointsMap)
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
syncRequired = true
}
}
*changes = make(endpointsChangeMap)

if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}

// Update endpoints for services.
for _, endpoints := range allEndpoints {
accumulateEndpointsMap(endpoints, hostname, &newMap)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
// and the (ip, port, proto) was removed from the endpoints.
for svcPort, epList := range curMap {

return syncRequired, hcEndpoints, staleSet
}

// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
for svcPort, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newMap[svcPort] {
if *newMap[svcPort][i] == *ep {
for i := range newEndpointsMap[svcPort] {
if *newEndpointsMap[svcPort][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
}
}
}
}

if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}

// accumulate local IPs per service, ignoring ports
localIPs := map[types.NamespacedName]sets.String{}
for svcPort := range newMap {
for _, ep := range newMap[svcPort] {
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could grow a test now that it is a function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

localIPs := make(map[types.NamespacedName]sets.String)
for svcPort := range endpointsMap {
for _, ep := range endpointsMap[svcPort] {
if ep.isLocal {
nsn := svcPort.NamespacedName
if localIPs[nsn] == nil {
Expand All @@ -644,25 +708,19 @@ func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, ho
}
}
}
// produce a count per service
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}

return newMap, hcEndpoints, staleSet
return localIPs
}

// Gather information about all the endpoint state for a given api.Endpoints.
// This can not report complete info on stale connections because it has limited
// scope - it only knows one Endpoints, but sees the whole current map. That
// cleanup has to be done above.
// Translates single Endpoints object to proxyEndpointsMap.
// This function is used for incremental updated of endpointsMap.
//
// NOTE: endpoints object should NOT be modified.
//
// TODO: this could be simplified:
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap {
if endpoints == nil {
return nil
}

endpointsMap := make(proxyEndpointsMap)
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
Expand All @@ -687,17 +745,18 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoi
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
(*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo)
endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo)
}
if glog.V(3) {
newEPList := []string{}
for _, ep := range (*newEndpoints)[svcPort] {
for _, ep := range endpointsMap[svcPort] {
newEPList = append(newEPList, ep.endpoint)
}
glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
}
}
}
return endpointsMap
}

// portProtoHash takes the ServicePortName and protocol for a service
Expand Down Expand Up @@ -784,7 +843,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || proxier.allServices == nil {
if !proxier.endpointsSynced || !proxier.servicesSynced {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
Expand All @@ -798,11 +857,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
return
}

// Figure out the new endpoints we need to activate.
newEndpoints, hcEndpoints, staleEndpoints := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)

// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && reflect.DeepEqual(newEndpoints, proxier.endpointsMap) {
if reason == syncReasonEndpoints && !endpointsSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}
Expand Down Expand Up @@ -1157,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// table doesn't currently have the same per-service structure that
// the nat table does, so we just stick this into the kube-services
// chain.
if len(newEndpoints[svcName]) == 0 {
if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
Expand All @@ -1170,7 +1229,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}

// If the service has no endpoints then reject packets.
if len(newEndpoints[svcName]) == 0 {
if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
Expand All @@ -1189,7 +1248,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// These two slices parallel each other - keep in sync
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range newEndpoints[svcName] {
for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcNameString, protocol, ep.endpoint)
endpointChains = append(endpointChains, endpointChain)
Expand Down Expand Up @@ -1379,7 +1438,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {

// Finish housekeeping.
proxier.serviceMap = newServices
proxier.endpointsMap = newEndpoints

// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
Expand Down