Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait till netexec is ready in kubeproxy e2e #18672

Merged
merged 1 commit into from
Mar 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (f *Framework) WaitForPodRunning(podName string) error {
return waitForPodRunningInNamespace(f.Client, podName, f.Namespace.Name)
}

// WaitForPodReady waits for the pod to flip to ready in the namespace.
func (f *Framework) WaitForPodReady(podName string) error {
return waitTimeoutForPodReadyInNamespace(f.Client, podName, f.Namespace.Name, podStartTimeout)
}

// WaitForPodRunningSlow waits for the pod to run in the namespace.
// It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout).
func (f *Framework) WaitForPodRunningSlow(podName string) error {
Expand Down
32 changes: 27 additions & 5 deletions test/e2e/kubeproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ const (
nodeHttpPort = 32080
nodeUdpPort = 32081
loadBalancerHttpPort = 100
netexecImageName = "gcr.io/google_containers/netexec:1.4"
netexecImageName = "gcr.io/google_containers/netexec:1.5"
testPodName = "test-container-pod"
hostTestPodName = "host-test-container-pod"
nodePortServiceName = "node-port-service"
loadBalancerServiceName = "load-balancer-service"
enableLoadBalancerTest = false
hitEndpointRetryDelay = 1 * time.Second
// Number of retries to hit a given set of endpoints. Needs to be high
// because we verify iptables statistical rr loadbalancing.
testTries = 30
)

type KubeProxyTestConfig struct {
Expand Down Expand Up @@ -150,7 +154,7 @@ func createHTTPClient(transport *http.Transport) *http.Client {

func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
clusterIP := config.nodePortService.Spec.ClusterIP
tries := epCount*epCount + 15 // if epCount == 0
tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> clusterIP:clusterUdpPort")
config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount)
By("dialing(http) node1 --> clusterIP:clusterHttpPort")
Expand All @@ -169,7 +173,7 @@ func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {

func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
node1_IP := config.externalAddrs[0]
tries := epCount*epCount + 15 // if epCount == 0
tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> node1:nodeUdpPort")
config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) node1 --> node1:nodeHttpPort")
Expand Down Expand Up @@ -248,7 +252,10 @@ func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targe
} else {
cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
}
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd)
// TODO: This simply tells us that we can reach the endpoints. Check that
// the probability of hitting a specific endpoint is roughly the same as
// hitting any other.
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay)
By(fmt.Sprintf("Dialing from node. command:%s", forLoop))
stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop)
Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount))
Expand All @@ -262,6 +269,19 @@ func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) {
}

func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
probe := &api.Probe{
InitialDelaySeconds: 10,
TimeoutSeconds: 30,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 3,
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{IntVal: endpointHttpPort},
},
},
}
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
Expand Down Expand Up @@ -293,6 +313,8 @@ func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node st
Protocol: api.ProtocolUDP,
},
},
LivenessProbe: probe,
ReadinessProbe: probe,
},
},
NodeName: node,
Expand Down Expand Up @@ -492,7 +514,7 @@ func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector m
// wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes.Items))
for _, p := range createdPods {
expectNoError(config.f.WaitForPodRunning(p.Name))
expectNoError(config.f.WaitForPodReady(p.Name))
rp, err := config.getPodClient().Get(p.Name)
expectNoError(err)
runningPods = append(runningPods, rp)
Expand Down
14 changes: 14 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,19 @@ func waitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName strin
})
}

func waitTimeoutForPodReadyInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error {
return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase == api.PodRunning {
Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName)
return true, nil
}
if pod.Status.Phase == api.PodFailed {
return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod))
}
return podReady(pod), nil
})
}

// waitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
func waitForPodNotPending(c *client.Client, ns, podName string) error {
return waitForPodCondition(c, ns, podName, "!pending", podStartTimeout, func(pod *api.Pod) (bool, error) {
Expand Down Expand Up @@ -2621,6 +2634,7 @@ func RunHostCmd(ns, name, cmd string) (string, error) {
// RunHostCmdOrDie calls RunHostCmd and dies on error.
func RunHostCmdOrDie(ns, name, cmd string) string {
stdout, err := RunHostCmd(ns, name, cmd)
Logf("stdout: %v", stdout)
expectNoError(err)
return stdout
}
Expand Down
2 changes: 1 addition & 1 deletion test/images/netexec/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

.PHONY: all netexec image push clean

TAG = 1.4
TAG = 1.5
PREFIX = gcr.io/google_containers


Expand Down
46 changes: 43 additions & 3 deletions test/images/netexec/netexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,36 @@ import (
"os/exec"
"strconv"
"strings"
"sync/atomic"
"time"
)

var (
httpPort = 8080
udpPort = 8081
shellPath = "/bin/sh"
httpPort = 8080
udpPort = 8081
shellPath = "/bin/sh"
serverReady = &atomicBool{0}
)

// atomicBool uses load/store operations on an int32 to simulate an atomic boolean.
type atomicBool struct {
v int32
}

// set sets the int32 to the given boolean.
func (a *atomicBool) set(value bool) {
if value {
atomic.StoreInt32(&a.v, 1)
return
}
atomic.StoreInt32(&a.v, 0)
}

// get returns true if the int32 == 1
func (a *atomicBool) get() bool {
return atomic.LoadInt32(&a.v) == 1
}

type output struct {
responses []string
errors []string
Expand Down Expand Up @@ -91,6 +112,18 @@ func exitHandler(w http.ResponseWriter, r *http.Request) {
func hostnameHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("GET /hostname")
fmt.Fprintf(w, getHostName())
http.HandleFunc("/healthz", healthzHandler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", httpPort), nil))
}

// healthHandler response with a 200 if the UDP server is ready. It also serves
// as a health check of the HTTP server by virtue of being a HTTP handler.
func healthzHandler(w http.ResponseWriter, r *http.Request) {
if serverReady.get() {
w.WriteHeader(200)
return
}
w.WriteHeader(http.StatusPreconditionFailed)
}

func shutdownHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -318,6 +351,13 @@ func startUDPServer(udpPort int) {
defer serverConn.Close()
buf := make([]byte, 1024)

log.Printf("Started UDP server")
// Start responding to readiness probes.
serverReady.set(true)
defer func() {
log.Printf("UDP server exited")
serverReady.set(false)
}()
for {
n, clientAddress, err := serverConn.ReadFromUDP(buf)
assertNoError(err)
Expand Down
26 changes: 25 additions & 1 deletion test/images/netexec/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,33 @@ metadata:
spec:
containers:
- name: netexec
image: gcr.io/google_containers/netexec:1.4
image: gcr.io/google_containers/netexec:1.5
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 8081
protocol: UDP
# give this pod the same liveness and readiness probe because
# we always want the kubelet to restart it if it becomes
# unready, and at the same time we want to observe readiness
# as a signal to start testing.
livenessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 3
periodSeconds: 10
successThreshold: 1
readinessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 3
periodSeconds: 10
successThreshold: 1