Skip to content

Commit

Permalink
UPSTREAM: <carry>: implement "local-with-fallback" external traffic p…
Browse files Browse the repository at this point in the history
…olicy

If a service has a
"traffic-policy.network.alpha.openshift.io/local-with-fallback"
annotation, then only treat it as "externalTrafficPolicy: Local" when
there are actually running local pods.

That is, if we receive traffic for such a service after the last local
pod terminates, then forward it to a remote pod rather than dropping
it.

Note:
change utilpointer to pointer to conform to upstream
modified to suit changes to asserIPTablesRulesEqual

(cherry picked from commit 9500d08)
(cherry picked from commit e70c952)
(cherry picked from commit 9486899)
  • Loading branch information
danwinship authored and bpickard22 committed Apr 17, 2024
1 parent 0a3f0c3 commit 9d8de2b
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 2 deletions.
27 changes: 25 additions & 2 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -102,8 +102,12 @@ type servicePortInfo struct {
localPolicyChainName utiliptables.Chain
firewallChainName utiliptables.Chain
externalChainName utiliptables.Chain

localWithFallback bool
}

const localWithFallbackAnnotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback"

// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
Expand All @@ -118,6 +122,14 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)

if _, set := service.Annotations[localWithFallbackAnnotation]; set {
if svcPort.ExternalPolicyLocal() {
svcPort.localWithFallback = true
} else {
klog.Warningf("Ignoring annotation %q on Service %s which does not have Local ExternalTrafficPolicy", localWithFallbackAnnotation, svcName)
}
}

return svcPort
}

Expand Down Expand Up @@ -986,6 +998,10 @@ func (proxier *Proxier) syncProxyRules() {
}
}

// If "local-with-fallback" is in effect and there are no local endpoints,
// then we will force cluster behavior
localWithFallback := svcInfo.localWithFallback && len(localEndpoints) == 0

// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
Expand Down Expand Up @@ -1026,7 +1042,7 @@ func (proxier *Proxier) syncProxyRules() {
// local traffic are set up.)
externalPolicyChain := clusterPolicyChain
hasExternalEndpoints := hasEndpoints
if svcInfo.ExternalPolicyLocal() {
if svcInfo.ExternalPolicyLocal() && !localWithFallback {
externalPolicyChain = localPolicyChain
if len(localEndpoints) == 0 {
hasExternalEndpoints = false
Expand Down Expand Up @@ -1261,7 +1277,14 @@ func (proxier *Proxier) syncProxyRules() {
if usesExternalTrafficChain {
natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))

if !svcInfo.ExternalPolicyLocal() {
if localWithFallback {
// Masquerade external traffic but not internal
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
proxier.localDetector.IfNotLocal(),
"-j", string(kubeMarkMasqChain))
} else if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade,
// in case we cross nodes.
natRules.Write(
Expand Down
112 changes: 112 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Expand Up @@ -58,6 +58,7 @@ import (
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

Expand Down Expand Up @@ -3292,6 +3293,117 @@ func TestExternalTrafficPolicyCluster(t *testing.T) {
masq: true,
},
})

}

func TestOnlyLocalWithFallback(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcIP := "172.30.0.41"
svcPort := 80
svcNodePort := 3001
svcHealthCheckNodePort := 30000
svcLBIP := "1.2.3.4"
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
svcSessionAffinityTimeout := int32(10800)

makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.ObjectMeta.Annotations = map[string]string{
localWithFallbackAnnotation: "",
}
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Spec.HealthCheckNodePort = int32(svcHealthCheckNodePort)
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: svcLBIP,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
}
}),
)

epIP1 := "10.180.0.1"
epIP2 := "10.180.2.1"
tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP1},
NodeName: pointer.StringPtr("host1"),
}, {
Addresses: []string{epIP2},
NodeName: pointer.StringPtr("host2"),
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.StringPtr(svcPortName.Port),
Port: pointer.Int32(int32(svcPort)),
Protocol: &tcpProtocol,
}}
}),
)

fp.syncProxyRules()

expected := dedent.Dedent(`
*filter
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FIREWALL - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-NODEPORTS -m comment --comment "ns1/svc1:p80 health check node port" -m tcp -p tcp --dport 30000 -j ACCEPT
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-EXT-XPGD46QRK7WJZT7O - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-NODEPORTS -m comment --comment ns1/svc1:p80 -m tcp -p tcp --dport 3001 -j KUBE-EXT-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 loadbalancer IP" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j KUBE-EXT-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-EXT-XPGD46QRK7WJZT7O -m comment --comment "masquerade traffic for ns1/svc1:p80 external destinations" ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-EXT-XPGD46QRK7WJZT7O -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-SXIVWICOYRO3J4NJ -m comment --comment ns1/svc1:p80 -s 10.180.0.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SXIVWICOYRO3J4NJ -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-SXIVWICOYRO3J4NJ --set -m tcp -p tcp -j DNAT --to-destination 10.180.0.1:80
-A KUBE-SEP-ZX7GRIZKSNUQ3LAJ -m comment --comment ns1/svc1:p80 -s 10.180.2.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-ZX7GRIZKSNUQ3LAJ -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --set -m tcp -p tcp -j DNAT --to-destination 10.180.2.1:80
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -m recent --name KUBE-SEP-SXIVWICOYRO3J4NJ --rcheck --seconds 10800 --reap -j KUBE-SEP-SXIVWICOYRO3J4NJ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.0.1:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-SXIVWICOYRO3J4NJ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.180.2.1:80" -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
}

func TestComputeProbability(t *testing.T) {
Expand Down

0 comments on commit 9d8de2b

Please sign in to comment.