Skip to content

Commit

Permalink
Fix client IP preservation for NodePort service with protocol SCTP
Browse files Browse the repository at this point in the history
The iptables rule that matches kubeNodePortLocalSetSCTP must be inserted
before the one matches kubeNodePortSetSCTP, otherwise all SCTP traffic
would be masqueraded regardless of whether its ExternalTrafficPolicy is
Local or not.

To cover the case in tests, the patch adds rule order validation to
checkIptables.
  • Loading branch information
tnqn committed May 26, 2022
1 parent a347c19 commit cc6a56e
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ var ipsetWithIptablesChain = []struct {
{kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
{kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
{kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
}

// In IPVS proxy mode, the following flags need to be set
Expand Down
179 changes: 174 additions & 5 deletions pkg/proxy/ipvs/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,15 @@ func TestNodePortIPv4(t *testing.T) {
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
},
},
Expand Down Expand Up @@ -845,6 +851,124 @@ func TestNodePortIPv4(t *testing.T) {
},
},
},
{
name: "node port service with protocol sctp and externalTrafficPolicy local",
services: []*v1.Service{
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
NodePort: int32(3001),
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
},
endpoints: []*v1.Endpoints{
makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: "10.180.0.1",
NodeName: utilpointer.StringPtr(testHostname),
}, {
IP: "10.180.1.1",
NodeName: utilpointer.StringPtr("otherHost"),
}},
Ports: []v1.EndpointPort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
}},
}}
}),
},
nodeIPs: []net.IP{
net.ParseIP("100.101.102.103"),
},
nodePortAddresses: []string{},
expectedIPVS: &ipvstest.FakeIPVS{
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
Address: net.ParseIP("10.20.30.41"),
Protocol: "SCTP",
Port: uint16(80),
Scheduler: "rr",
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.103"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
},
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
{
Address: net.ParseIP("10.180.1.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
},
},
expectedIPSets: netlinktest.ExpectedIPSet{
kubeNodePortSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
kubeNodePortLocalSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
},
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -1798,6 +1922,14 @@ func TestLoadBalancer(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(kubeLoadBalancerSet): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
Expand Down Expand Up @@ -1897,12 +2029,18 @@ func TestOnlyLocalNodePorts(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
checkIptables(t, ipt, epIpt)
Expand Down Expand Up @@ -1971,6 +2109,10 @@ func TestHealthCheckNodePort(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
Expand Down Expand Up @@ -2061,6 +2203,14 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet,
Expand Down Expand Up @@ -2133,9 +2283,14 @@ func TestAcceptIPVSTraffic(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {
{JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeExternalIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs
{JumpChain: string(KubeNodePortChain), MatchSet: ""},
{JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
},
}
checkIptables(t, ipt, epIpt)
Expand Down Expand Up @@ -2233,6 +2388,14 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet,
Expand Down Expand Up @@ -3743,13 +3906,19 @@ func hasMasqRandomFully(rules []iptablestest.Rule) bool {
return false
}

// checkIptabless to check expected iptables chain and rules
// checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the
// expected rules.
func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
for epChain, epRules := range epIpt {
rules := ipt.GetRules(epChain)
for _, epRule := range epRules {
if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
if len(rules) != len(epRules) {
t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules))
continue
}
for i, epRule := range epRules {
rule := rules[i]
if rule[iptablestest.Jump] != epRule.JumpChain || !strings.Contains(rule[iptablestest.MatchSet], epRule.MatchSet) {
t.Errorf("Expected MatchSet=%s JumpChain=%s, got MatchSet=%s JumpChain=%s", epRule.MatchSet, epRule.JumpChain, rule[iptablestest.MatchSet], rule[iptablestest.Jump])
}
}
}
Expand Down

0 comments on commit cc6a56e

Please sign in to comment.