Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc iptables proxy fixes #106030

Merged
183 changes: 84 additions & 99 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,12 +955,8 @@ func (proxier *Proxier) syncProxyRules() {
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
readyEndpoints := make([]*endpointsInfo, 0)
readyEndpointChains := make([]utiliptables.Chain, 0)
localReadyEndpointChains := make([]utiliptables.Chain, 0)
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)

// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
Expand Down Expand Up @@ -1002,7 +998,82 @@ func (proxier *Proxier) syncProxyRules() {
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
hasEndpoints := len(allEndpoints) > 0

// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
Copy link
Member

@jayunit100 jayunit100 Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ignore if not important) quick thought ... since these hasLocalServingTerminatingEndpoints hasLocalReadyEndpoints variables are just used to calculate useTerminatingEndpoints can we move this bit of if/else another function? just for understandability ... not important but just thinkinig out loud trying to reverse engineer the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do some careful refactoring of all this code (I started some in #106158) but we can do those as small followups. Balancing readability of this code with the desire to DRY is hard.

for _, ep := range allEndpoints {
if ep.IsReady() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if ep.IsServing() && ep.IsTerminating() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints

// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
thockin marked this conversation as resolved.
Show resolved Hide resolved

endpointChain := epInfo.endpointChain(svcNameString, protocol)
endpointInUse := false

if epInfo.Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
endpointInUse = true
}
if svc.NodeLocalExternal() && epInfo.IsLocal {
if useTerminatingEndpoints {
if epInfo.Serving && epInfo.Terminating {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
} else if epInfo.Ready {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
}

if !endpointInUse {
continue
}

// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true

args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}

svcChain := svcInfo.servicePortChainName
if hasEndpoints {
Expand Down Expand Up @@ -1319,35 +1390,9 @@ func (proxier *Proxier) syncProxyRules() {
continue
}

// Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together.
// These two slices parallel each other - keep in sync
endpoints = endpoints[:0]
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}

endpoints = append(endpoints, epInfo)
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)

// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}

// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
for _, endpointChain := range readyEndpointChains {
args = append(args[:0],
"-A", string(svcChain),
)
Expand All @@ -1361,38 +1406,9 @@ func (proxier *Proxier) syncProxyRules() {
}
}

// Firstly, categorize each endpoint into three buckets:
// 1. all endpoints that are ready and NOT terminating.
// 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
readyEndpointChains = readyEndpointChains[:0]
readyEndpoints := readyEndpoints[:0]
localReadyEndpointChains := localReadyEndpointChains[:0]
localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
for i, endpointChain := range endpointChains {
if endpoints[i].Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
readyEndpoints = append(readyEndpoints, endpoints[i])
}

if svc.NodeLocalExternal() && endpoints[i].IsLocal {
if endpoints[i].Ready {
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
} else if endpoints[i].Serving && endpoints[i].Terminating {
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
}
}
}

// Now write loadbalancing & DNAT rules.
// Now write loadbalancing rules
numReadyEndpoints := len(readyEndpointChains)
for i, endpointChain := range readyEndpointChains {
epIP := readyEndpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}

// Balancing rules in the per-service chain.
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
Expand All @@ -1408,31 +1424,6 @@ func (proxier *Proxier) syncProxyRules() {
utilproxy.WriteLine(proxier.natRules, args...)
}

// Every endpoint gets a chain, regardless of its state. This is required later since we may
// want to jump to endpoint chains that are terminating.
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}

// Rules in the per-endpoint chain.
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epIP)),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}

// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.NodeLocalExternal() {
continue
Expand Down Expand Up @@ -1461,12 +1452,6 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)

// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
localEndpointChains := localReadyEndpointChains
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 {
localEndpointChains = localServingTerminatingEndpointChains
}

numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints
Expand Down Expand Up @@ -1596,6 +1581,11 @@ func (proxier *Proxier) syncProxyRules() {
"-j", "ACCEPT",
)

numberFilterIptablesRules := utilproxy.CountBytesLines(proxier.filterRules.Bytes())
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(numberFilterIptablesRules))
numberNatIptablesRules := utilproxy.CountBytesLines(proxier.natRules.Bytes())
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(numberNatIptablesRules))

// Write the end-of-table markers.
utilproxy.WriteLine(proxier.filterRules, "COMMIT")
utilproxy.WriteLine(proxier.natRules, "COMMIT")
Expand All @@ -1608,11 +1598,6 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())

numberFilterIptablesRules := utilproxy.CountBytesLines(proxier.filterRules.Bytes())
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(numberFilterIptablesRules))
numberNatIptablesRules := utilproxy.CountBytesLines(proxier.natRules.Bytes())
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(numberNatIptablesRules))

klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
Expand Down
Loading