Skip to content

Commit

Permalink
proxy/iptables: fix all-vs-ready endpoints a bit
Browse files Browse the repository at this point in the history
Filter the allEndpoints list into readyEndpoints sooner, and set
"hasEndpoints" based (mostly) on readyEndpoints, not allEndpoints (so
that, eg, we correctly generate REJECT rules for services with no
_functioning_ endpoints, even if they have unusable terminating
endpoints).

Also, write out the endpoint chains at the top of the loop when we
iterate the endpoints for the first time, rather than copying some of
the data to another set of variables and then writing them out later.
And don't write out endpoint chains that won't be used

Also, generate affinity rules only for readyEndpoints rather than
allEndpoints, so affinity gets broken correctly when an endpoint
becomes unready.
  • Loading branch information
danwinship authored and Antonio Ojea committed Nov 12, 2021
1 parent 5e0795b commit 13a4262
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 127 deletions.
161 changes: 79 additions & 82 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,12 +974,8 @@ func (proxier *Proxier) syncProxyRules() {
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
readyEndpoints := make([]*endpointsInfo, 0)
readyEndpointChains := make([]utiliptables.Chain, 0)
localReadyEndpointChains := make([]utiliptables.Chain, 0)
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)

// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
Expand Down Expand Up @@ -1021,7 +1017,82 @@ func (proxier *Proxier) syncProxyRules() {
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
hasEndpoints := len(allEndpoints) > 0

// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range allEndpoints {
if ep.IsReady() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if ep.IsServing() && ep.IsTerminating() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints

// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}

endpointChain := epInfo.endpointChain(svcNameString, protocol)
endpointInUse := false

if epInfo.Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
endpointInUse = true
}
if svc.NodeLocalExternal() && epInfo.IsLocal {
if useTerminatingEndpoints {
if epInfo.Serving && epInfo.Terminating {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
} else if epInfo.Ready {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
}

if !endpointInUse {
continue
}

// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true

args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(net.ParseIP(epInfo.IP())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}

svcChain := svcInfo.servicePortChainName
if hasEndpoints {
Expand Down Expand Up @@ -1338,35 +1409,9 @@ func (proxier *Proxier) syncProxyRules() {
continue
}

// Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together.
// These two slices parallel each other - keep in sync
endpoints = endpoints[:0]
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String())
continue
}

endpoints = append(endpoints, epInfo)
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)

// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}

// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
for _, endpointChain := range readyEndpointChains {
args = append(args[:0],
"-A", string(svcChain),
)
Expand All @@ -1380,30 +1425,7 @@ func (proxier *Proxier) syncProxyRules() {
}
}

// Firstly, categorize each endpoint into three buckets:
// 1. all endpoints that are ready and NOT terminating.
// 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
readyEndpointChains = readyEndpointChains[:0]
readyEndpoints := readyEndpoints[:0]
localReadyEndpointChains := localReadyEndpointChains[:0]
localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
for i, endpointChain := range endpointChains {
if endpoints[i].Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
readyEndpoints = append(readyEndpoints, endpoints[i])
}

if svc.NodeLocalExternal() && endpoints[i].IsLocal {
if endpoints[i].Ready {
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
} else if endpoints[i].Serving && endpoints[i].Terminating {
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
}
}
}

// Now write loadbalancing & DNAT rules.
// Now write loadbalancing rules
numReadyEndpoints := len(readyEndpointChains)
for i, endpointChain := range readyEndpointChains {
// Balancing rules in the per-service chain.
Expand All @@ -1421,25 +1443,6 @@ func (proxier *Proxier) syncProxyRules() {
utilproxy.WriteLine(proxier.natRules, args...)
}

// Every endpoint gets a chain, regardless of its state. This is required later since we may
// want to jump to endpoint chains that are terminating.
for i, endpointChain := range endpointChains {
// Rules in the per-endpoint chain.
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(net.ParseIP(endpoints[i].IP())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}

// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.NodeLocalExternal() {
continue
Expand Down Expand Up @@ -1468,12 +1471,6 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)

// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
localEndpointChains := localReadyEndpointChains
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 {
localEndpointChains = localServingTerminatingEndpointChains
}

numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints
Expand Down

0 comments on commit 13a4262

Please sign in to comment.