Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e test for Source IP preservation (pod to service cluster IP) #30739

Merged
merged 1 commit into from
Sep 5, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
193 changes: 180 additions & 13 deletions test/e2e/service.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,58 @@ var _ = framework.KubeDescribe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
})

// TODO: verify source IP preservation for LoadBalancer type services when applicable
It("should preserve source pod IP for traffic thru service cluster IP", func() {

serviceName := "sourceip-test"
ns := f.Namespace.Name

By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
jig := NewServiceTestJig(c, serviceName)
servicePort := 8080
tcpService := jig.CreateTCPServiceWithPort(ns, nil, int32(servicePort))
jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
defer func() {
framework.Logf("Cleaning up the sourceip test service")
err := c.Services(ns).Delete(serviceName)
Expect(err).NotTo(HaveOccurred())
}()
serviceIp := tcpService.Spec.ClusterIP
framework.Logf("sourceip-test cluster ip: %s", serviceIp)

By("Picking multiple nodes")
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)

if len(nodes.Items) == 1 {
framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider)
}

node1 := nodes.Items[0]
node2 := nodes.Items[1]

By("Creating a webserver pod be part of the TCP service which echoes back source ip")
serverPodName := "echoserver-sourceip"
jig.launchEchoserverPodOnNode(f, node1.Name, serverPodName)
defer func() {
framework.Logf("Cleaning up the echo server pod")
err := c.Pods(ns).Delete(serverPodName, nil)
Expect(err).NotTo(HaveOccurred())
}()

// Waiting for service to expose endpoint
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{serverPodName: {servicePort}})

By("Retrieve sourceip from a pod on the same node")
sourceIp1, execPodIp1 := execSourceipTest(f, c, ns, node1.Name, serviceIp, servicePort)
By("Verifying the preserved source ip")
Expect(sourceIp1).To(Equal(execPodIp1))

By("Retrieve sourceip from a pod on a different node")
sourceIp2, execPodIp2 := execSourceipTest(f, c, ns, node2.Name, serviceIp, servicePort)
By("Verifying the preserved source ip")
Expect(sourceIp2).To(Equal(execPodIp2))
})

It("should be able to up and down services", func() {
// TODO: use the ServiceTestJig here
// this test uses framework.NodeSSHHosts that does not work if a Node only reports LegacyHostIP
Expand Down Expand Up @@ -1232,11 +1284,8 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex
framework.Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, framework.ServiceStartTimeout)
}

// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func createExecPodOrFail(c *client.Client, ns, generateName string) string {
framework.Logf("Creating new exec pod")
// newExecPodSpec returns the pod spec of exec pod
func newExecPodSpec(ns, generateName string) *api.Pod {
immediate := int64(0)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Expand All @@ -1254,10 +1303,38 @@ func createExecPodOrFail(c *client.Client, ns, generateName string) string {
},
},
}
created, err := c.Pods(ns).Create(pod)
return pod
}

// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func createExecPodOrFail(client *client.Client, ns, generateName string) string {
framework.Logf("Creating new exec pod")
execPod := newExecPodSpec(ns, generateName)
created, err := client.Pods(ns).Create(execPod)
Expect(err).NotTo(HaveOccurred())
err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name)
if err != nil {
return false, nil
}
return retrievedPod.Status.Phase == api.PodRunning, nil
})
Expect(err).NotTo(HaveOccurred())
return created.Name
}

// createExecPodOnNode launches a exec pod in the given namespace and node
// waits until it's Running, created pod name would be returned
func createExecPodOnNode(client *client.Client, ns, nodeName, generateName string) string {
framework.Logf("Creating exec pod %q in namespace %q", generateName, ns)
execPod := newExecPodSpec(ns, generateName)
execPod.Spec.NodeName = nodeName
created, err := client.Pods(ns).Create(execPod)
Expect(err).NotTo(HaveOccurred())
err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := c.Pods(pod.Namespace).Get(created.Name)
retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name)
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -1712,8 +1789,8 @@ func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig {

// newServiceTemplate returns the default api.Service template for this jig, but
// does not actually create the Service. The default Service has the same name
// as the jig and exposes port 80.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service {
// as the jig and exposes the given port.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol, port int32) *api.Service {
service := &api.Service{
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Expand All @@ -1725,19 +1802,34 @@ func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol
Ports: []api.ServicePort{
{
Protocol: proto,
Port: 80,
Port: port,
},
},
},
}
return service
}

// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
// jig's defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *api.Service), port int32) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolTCP, port)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.Services(namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}

// CreateTCPServiceOrFail creates a new TCP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolTCP)
svc := j.newServiceTemplate(namespace, api.ProtocolTCP, 80)
if tweak != nil {
tweak(svc)
}
Expand All @@ -1752,7 +1844,7 @@ func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolUDP)
svc := j.newServiceTemplate(namespace, api.ProtocolUDP, 80)
if tweak != nil {
tweak(svc)
}
Expand Down Expand Up @@ -2181,3 +2273,78 @@ func (t *ServiceTestFixture) Cleanup() []error {

return errs
}

// newEchoServerPodSpec returns the pod spec of echo server pod
func newEchoServerPodSpec(podName string) *api.Pod {
port := 8080
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "echoserver",
Image: "gcr.io/google_containers/echoserver:1.4",
Ports: []api.ContainerPort{{ContainerPort: int32(port)}},
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
return pod
}

// launchEchoserverPodOnNode launches a pod serving http on port 8080 to act
// as the target for source IP preservation test. The client's source ip would
// be echoed back by the web server.
func (j *ServiceTestJig) launchEchoserverPodOnNode(f *framework.Framework, nodeName, podName string) {
framework.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
pod := newEchoServerPodSpec(podName)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.Client.Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
framework.ExpectNoError(err)
framework.ExpectNoError(f.WaitForPodRunning(podName))
framework.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}

func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIp string, servicePort int) (string, string) {
framework.Logf("Creating an exec pod on the same node")
execPodName := createExecPodOnNode(f.Client, ns, nodeName, fmt.Sprintf("execpod-sourceip-%s", nodeName))
defer func() {
framework.Logf("Cleaning up the exec pod")
err := c.Pods(ns).Delete(execPodName, nil)
Expect(err).NotTo(HaveOccurred())
}()
podClient := f.Client.Pods(ns)
execPod, err := podClient.Get(execPodName)
ExpectNoError(err)
execPodIp := execPod.Status.PodIP
framework.Logf("Exec pod ip: %s", execPodIp)

framework.Logf("Getting echo response from service")
var stdout string
timeout := 2 * time.Minute
framework.Logf("Waiting up to %v for sourceIp test to be executed", timeout)
cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIp, servicePort)
// need timeout mechanism because it may takes more times for iptables to be populated
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2) {
stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
framework.Logf("got err: %v, retry until timeout", err)
continue
}
break
}

ExpectNoError(err)

// the stdout return from RunHostCmd seems to come with "\n", so TrimSpace is needed
// desired stdout in this format: client_address=x.x.x.x
outputs := strings.Split(strings.TrimSpace(stdout), "=")
sourceIp := outputs[1]

return execPodIp, sourceIp
}