Skip to content

Commit

Permalink
add scale test for probes
Browse files Browse the repository at this point in the history
Change-Id: I1e49943531b569b5e02f82369750a9ca899ae726
  • Loading branch information
aojea authored and SergeyKanzhelev committed Jan 26, 2023
1 parent b6429e3 commit d3b0f3f
Showing 1 changed file with 280 additions and 0 deletions.
280 changes: 280 additions & 0 deletions pkg/kubelet/prober/scale_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prober

import (
"context"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
utilpointer "k8s.io/utils/pointer"
)

// TCP sockets goes through a TIME-WAIT state (default 60 sec) before being freed,
// causing conntrack entries and ephemeral ports to be hold for 60 seconds
// despite the probe may have finished in less than 1 second.
// If the rate of probes is higher than the rate the OS recycles the ports used,
// it can consume a considerable number of ephemeral ports or conntrack entries.
// These tests verify that after certain period the probes keep working, if the probes
// don't close the sockets faster, they will start to fail.
// The test creates a TCP or HTTP server to fake a pod. It creates 1 pod with 600 fake
// containers each and runs one probe for each of these containers (all the probes comes
// from the same process, same as in the Kubelet, and targets the same IP:port to verify
// that the ephemeral port is not exhausted.

// The default port range on a normal Linux system has 28321 free ephemeral ports per
// tuple srcIP,srcPort:dstIP:dstPort:Proto: /proc/sys/net/ipv4/ip_local_port_range 32768 60999
// 1 pods x 600 containers/pod x 1 probes/container x 1 req/sec = 600 req/sec
// 600 req/sec x 59 sec = 35400
// The test should run out of ephemeral ports in less than one minute and start failing connections
// Ref: https://github.com/kubernetes/kubernetes/issues/89898#issuecomment-1383207322

func TestTCPPortExhaustion(t *testing.T) {
// This test creates a considereable number of connections in a short time
// and flakes on constrained environments, thus it is skipped by default.
// The test is left for manual verification or experimentation with new
// changes on the probes.
t.Skip("skipping TCP port exhaustion tests")

const (
numTestPods = 1
numContainers = 600
)

if testing.Short() {
t.Skip("skipping TCP port exhaustion in short mode")
}

tests := []struct {
name string
http bool // it can be tcp or http
}{
{"TCP", false},
{"HTTP", true},
}
for _, tt := range tests {
t.Run(fmt.Sprintf(tt.name), func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),
nil, // runner
&record.FakeRecorder{},
).(*manager)
defer cleanup(t, m)

now := time.Now()
fakePods := make([]*fakePod, numTestPods)
for i := 0; i < numTestPods; i++ {
fake, err := newFakePod(tt.http)
if err != nil {
t.Fatalf("unexpected error creating fake pod: %v", err)
}
defer fake.stop()
handler := fake.probeHandler()
fakePods[i] = fake

pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(fmt.Sprintf("pod%d", i)),
Name: fmt.Sprintf("pod%d", i),
Namespace: "test",
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Phase: v1.PodPhase(v1.PodReady),
PodIPs: []v1.PodIP{{IP: "127.0.0.1"}},
},
}
for j := 0; j < numContainers; j++ {
// use only liveness probes for simplicity, initial state is success for them
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: fmt.Sprintf("container%d", j),
LivenessProbe: newProbe(handler),
})
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: fmt.Sprintf("container%d", j),
ContainerID: fmt.Sprintf("pod%d://container%d", i, j),
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: metav1.Now(),
},
},
Started: utilpointer.Bool(true),
})
}
podManager.AddPod(&pod)
m.statusManager.SetPodStatus(&pod, pod.Status)
m.AddPod(&pod)
}
t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))

ctx, cancel := context.WithTimeout(context.Background(), 59*time.Second)
defer cancel()
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for {
var result results.Update
var probeType string
select {
case result = <-m.startupManager.Updates():
probeType = "startup"
case result = <-m.livenessManager.Updates():
probeType = "liveness"
case result = <-m.readinessManager.Updates():
probeType = "readiness"
case <-ctx.Done():
return
}
switch result.Result.String() {
// The test will fail if any of the probes fails
case "Failure":
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
case "UNKNOWN": // startup probes
t.Logf("UNKNOWN state for %v", result)
default:
}
}
}()
wg.Wait()

// log the number of connections received in each pod for debugging test failures.
for _, pod := range fakePods {
n := pod.connections()
t.Logf("Number of connections %d", n)
}

})
}

}

func newProbe(handler v1.ProbeHandler) *v1.Probe {
return &v1.Probe{
ProbeHandler: handler,
TimeoutSeconds: 1,
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
}
}

// newFakePod runs a server (TCP or HTTP) in a random port
func newFakePod(httpServer bool) (*fakePod, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to bind: %v", err)
}
f := &fakePod{ln: ln, http: httpServer}

// spawn an http server or a TCP server that counts the number of connections received
if httpServer {
var mu sync.Mutex
visitors := map[string]struct{}{}
go http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
if _, ok := visitors[r.RemoteAddr]; !ok {
atomic.AddInt64(&f.numConnection, 1)
visitors[r.RemoteAddr] = struct{}{}
}
}))
} else {
go func() {
for {
conn, err := ln.Accept()
if err != nil {
// exit when the listener is closed
return
}
atomic.AddInt64(&f.numConnection, 1)
// handle request but not block
go func(c net.Conn) {
defer c.Close()
// read but swallow the errors since the probe doesn't send data
buffer := make([]byte, 1024)
c.Read(buffer)
// respond
conn.Write([]byte("Hi back!\n"))
}(conn)

}
}()
}
return f, nil

}

type fakePod struct {
ln net.Listener
numConnection int64
http bool
}

func (f *fakePod) probeHandler() v1.ProbeHandler {
port := f.ln.Addr().(*net.TCPAddr).Port
var handler v1.ProbeHandler
if f.http {
handler = v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
} else {
handler = v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Host: "127.0.0.1",
Port: intstr.FromInt(port),
},
}
}
return handler
}

func (f *fakePod) stop() {
f.ln.Close()
}

func (f *fakePod) connections() int {
return int(atomic.LoadInt64(&f.numConnection))
}

0 comments on commit d3b0f3f

Please sign in to comment.