Skip to content

Commit

Permalink
kube-proxy: fix stale detection logic
Browse files Browse the repository at this point in the history
The logic to detect stale endpoints was not assuming the endpoint
readiness.

We can have stale entries on UDP services for 2 reasons:
- an endpoint was receiving traffic and is removed or replaced
- a service was receiving traffic but not forwarding it, and starts
to forward it.

Add an e2e test to cover the regression
  • Loading branch information
Antonio Ojea committed Nov 8, 2021
1 parent bd146ab commit 53d4c15
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 4 deletions.
33 changes: 30 additions & 3 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {

// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}

// GetNodeName returns the NodeName for this endpoint.
Expand Down Expand Up @@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
// is used to store stale udp service in order to clear udp conntrack later.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
// and then goes unready or changes its IP address.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries
// since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
stale := true
// Check if the endpoint has changed, including if it went from ready to not ready.
// If it did change stale entries for the old endpoint has to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false
Expand All @@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
}
}

// Detect stale services
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
// There may exist a conntrack entry that could blackhole traffic to the service.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
epReady := 0
for _, ep := range epList {
if ep.IsReady() {
epReady++
}
}

oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}

if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal &&
e.protocol == o.protocol &&
e.chainName == o.chainName
e.chainName == o.chainName &&
e.Ready == o.Ready
}

// Returns the endpoint chain name for a given endpointsInfo.
Expand Down
26 changes: 26 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3232,6 +3232,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &udpProtocol,
}}
}),
)

fp.syncProxyRules()

if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}

populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Expand Down
73 changes: 73 additions & 0 deletions test/e2e/network/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
}
})

// Regression test for #105657
// 1. Create an UDP Service
// 2. Client Pod sending traffic to the UDP service
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
// that the Endpoint conditions Ready is false.
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
// subsequent traffic.
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {

// Create a ClusterIP service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)

// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)

// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)

// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
// Add an init container to hold the pod to be ready for 15 seconds
serverPod1.Spec.InitContainers = []v1.Container{
{
Name: "init",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
},
}
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
fr.PodClient().CreateSync(serverPod1)

// wait until the endpoints are ready
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})

// Note that the fact that Endpoints object already exists, does NOT mean
// that iptables (or whatever else is used) was already programmed.
// Additionally take into account that UDP conntract entries timeout is
// 30 seconds by default.
// Based on the above check if the pod receives the traffic.
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
framework.Failf("Failed to connect to backend pod")
}

})

// Regression test for #74839, where:
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes
// a problem where spurious retransmits in a long-running TCP connection to a service
Expand Down

0 comments on commit 53d4c15

Please sign in to comment.