Skip to content

Commit

Permalink
Merge pull request kubernetes#106239 from aojea/automated-cherry-pick…
Browse files Browse the repository at this point in the history
…-of-#106163-upstream-release-1.22

Automated cherry pick of kubernetes#106163: kube-proxy: fix stale detection logic
  • Loading branch information
k8s-ci-robot committed Nov 12, 2021
2 parents 2456f3f + 53d4c15 commit e8768d7
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 4 deletions.
33 changes: 30 additions & 3 deletions pkg/proxy/endpoints.go
Expand Up @@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {

// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}

// GetNodeName returns the NodeName for this endpoint.
Expand Down Expand Up @@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
// is used to store stale udp service in order to clear udp conntrack later.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
// and then goes unready or changes its IP address.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries
// since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
stale := true
// Check if the endpoint has changed, including if it went from ready to not ready.
// If it did change stale entries for the old endpoint has to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false
Expand All @@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
}
}

// Detect stale services
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
// There may exist a conntrack entry that could blackhole traffic to the service.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
epReady := 0
for _, ep := range epList {
if ep.IsReady() {
epReady++
}
}

oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}

if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/iptables/proxier.go
Expand Up @@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal &&
e.protocol == o.protocol &&
e.chainName == o.chainName
e.chainName == o.chainName &&
e.Ready == o.Ready
}

// Returns the endpoint chain name for a given endpointsInfo.
Expand Down
26 changes: 26 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Expand Up @@ -3232,6 +3232,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &udpProtocol,
}}
}),
)

fp.syncProxyRules()

if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}

populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Expand Down
73 changes: 73 additions & 0 deletions test/e2e/network/conntrack.go
Expand Up @@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
}
})

// Regression test for #105657
// 1. Create an UDP Service
// 2. Client Pod sending traffic to the UDP service
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
// that the Endpoint conditions Ready is false.
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
// subsequent traffic.
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {

// Create a ClusterIP service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)

// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)

// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)

// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
// Add an init container to hold the pod to be ready for 15 seconds
serverPod1.Spec.InitContainers = []v1.Container{
{
Name: "init",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
},
}
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
fr.PodClient().CreateSync(serverPod1)

// wait until the endpoints are ready
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})

// Note that the fact that Endpoints object already exists, does NOT mean
// that iptables (or whatever else is used) was already programmed.
// Additionally take into account that UDP conntract entries timeout is
// 30 seconds by default.
// Based on the above check if the pod receives the traffic.
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
framework.Failf("Failed to connect to backend pod")
}

})

// Regression test for #74839, where:
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes
// a problem where spurious retransmits in a long-running TCP connection to a service
Expand Down

0 comments on commit e8768d7

Please sign in to comment.