Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ssh with net=host hostexec pod and kubectl exec in e2e tests where possible #15777

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster/mesos/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ apiserver:
--service-node-port-range=30000-32767
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please squash all commits

--cloud-provider=mesos
--cloud-config=/opt/mesos-cloud.conf
--allow-privileged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a leftover from testing, or required to allow remote exec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is default in upstream now. There are even e2e tests for this (which are fixed in this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your specific question: it's not needed for remote exec, only for the PrivilegePod e2e tests.

--tls-cert-file=/var/run/kubernetes/auth/apiserver.crt
--tls-private-key-file=/var/run/kubernetes/auth/apiserver.key
--runtime-config=experimental/v1alpha1
Expand Down Expand Up @@ -138,6 +139,7 @@ scheduler:
--mesos-user=root
--api-servers=http://apiserver:8888
--mesos-master=mesosmaster1:5050
--allow-privileged
--cluster-dns=10.10.10.10
--cluster-domain=cluster.local
--mesos-executor-cpus=1.0
Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.authPath, "auth-path", s.authPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with --etcd-config")
fs.StringVar(&s.etcdConfigFile, "etcd-config", s.etcdConfigFile, "The config file for the etcd client. Mutually exclusive with --etcd-servers.")
fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "If true, allow privileged containers.")
fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "Enable privileged containers in the kubelet (compare the same flag in the apiserver).")
fs.StringVar(&s.clusterDomain, "cluster-domain", s.clusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.IPVar(&s.clusterDNS, "cluster-dns", s.clusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.StringVar(&s.staticPodsConfigPath, "static-pods-config", s.staticPodsConfigPath, "Path for specification of static pods. Path should point to dir containing the staticPods configuration files. Defaults to none.")
Expand Down
125 changes: 71 additions & 54 deletions test/e2e/kubeproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr"
Expand All @@ -40,7 +41,6 @@ import (
const (
endpointHttpPort = 8080
endpointUdpPort = 8081
endpointHostPort = 8082
testContainerHttpPort = 8080
clusterHttpPort = 80
clusterUdpPort = 90
Expand All @@ -49,19 +49,21 @@ const (
loadBalancerHttpPort = 100
netexecImageName = "gcr.io/google_containers/netexec:1.0"
testPodName = "test-container-pod"
hostTestPodName = "host-test-container-pod"
nodePortServiceName = "node-port-service"
loadBalancerServiceName = "load-balancer-service"
enableLoadBalancerTest = false
)

type KubeProxyTestConfig struct {
testContainerPod *api.Pod
testHostPod *api.Pod
endpointPods []*api.Pod
f *Framework
nodePortService *api.Service
loadBalancerService *api.Service
nodes []string
testContainerPod *api.Pod
hostTestContainerPod *api.Pod
endpointPods []*api.Pod
f *Framework
nodePortService *api.Service
loadBalancerService *api.Service
externalAddrs []string
nodes []api.Node
}

var _ = Describe("KubeProxy", func() {
Expand All @@ -71,8 +73,6 @@ var _ = Describe("KubeProxy", func() {
}

It("should test kube-proxy", func() {
SkipUnlessProviderIs(providersWithSSH...)

By("cleaning up any pre-existing namespaces used by this test")
config.cleanup()

Expand Down Expand Up @@ -168,7 +168,7 @@ func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
}

func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
node1_IP := config.nodes[0]
node1_IP := config.externalAddrs[0]
tries := epCount*epCount + 5 // + 10 if epCount == 0
By("dialing(udp) node1 --> node1:nodeUdpPort")
config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount)
Expand All @@ -177,7 +177,7 @@ func (config *KubeProxyTestConfig) hitNodePort(epCount int) {

By("dialing(udp) test container --> node1:nodeUdpPort")
config.dialFromTestContainer("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) container --> node1:nodeHttpPort")
By("dialing(http) test container --> node1:nodeHttpPort")
config.dialFromTestContainer("http", node1_IP, nodeHttpPort, tries, epCount)

By("dialing(udp) endpoint container --> node1:nodeUdpPort")
Expand All @@ -192,7 +192,7 @@ func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
By("Test disabled. dialing(http) node --> 127.0.0.1:nodeHttpPort")
//config.dialFromNode("http", "127.0.0.1", nodeHttpPort, tries, epCount)

node2_IP := config.nodes[1]
node2_IP := config.externalAddrs[1]
By("dialing(udp) node1 --> node2:nodeUdpPort")
config.dialFromNode("udp", node2_IP, nodeUdpPort, tries, epCount)
By("dialing(http) node1 --> node2:nodeHttpPort")
Expand Down Expand Up @@ -231,7 +231,7 @@ func (config *KubeProxyTestConfig) dialFromContainer(protocol, containerIP, targ
tries)

By(fmt.Sprintf("Dialing from container. Running command:%s", cmd))
stdout := config.ssh(cmd)
stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd)
var output map[string][]string
err := json.Unmarshal([]byte(stdout), &output)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Could not unmarshal curl response: %s", stdout))
Expand All @@ -242,24 +242,17 @@ func (config *KubeProxyTestConfig) dialFromContainer(protocol, containerIP, targ
func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targetPort, tries, expectedCount int) {
var cmd string
if protocol == "udp" {
cmd = fmt.Sprintf("echo 'hostName' | nc -w 1 -u %s %d", targetIP, targetPort)
cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that really a long enough timeout? worried about introducing flakes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be bigger (or equal) than that of nc. I guess we can decrease it to 1 sec (like the -w 1).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was actually worried about it being long enough, not that it might be too long. if nc is timing out after 1s then timeout -t 3 is probably fine as-is

} else {
cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
}
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd)
By(fmt.Sprintf("Dialing from node. command:%s", forLoop))
stdout := config.ssh(forLoop)
stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop)
Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount))
}

func (config *KubeProxyTestConfig) ssh(cmd string) string {
stdout, _, code, err := SSH(cmd, config.nodes[0]+":22", testContext.Provider)
Expect(err).NotTo(HaveOccurred(), "error while SSH-ing to node: %v (code %v)", err, code)
Expect(code).Should(BeZero(), "command exited with non-zero code %v. cmd:%s", code, cmd)
return stdout
}

func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string) *api.Pod {
func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
Expand Down Expand Up @@ -288,15 +281,12 @@ func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string) *api.Po
{
Name: "udp",
ContainerPort: endpointUdpPort,
},
{
Name: "host",
ContainerPort: endpointHttpPort,
HostPort: endpointHostPort,
Protocol: api.ProtocolUDP,
},
},
},
},
NodeName: node,
},
}
return pod
Expand Down Expand Up @@ -344,8 +334,8 @@ func (config *KubeProxyTestConfig) createNodePortService(selector map[string]str
Spec: api.ServiceSpec{
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{Port: clusterHttpPort, Name: "http", Protocol: "TCP", NodePort: nodeHttpPort, TargetPort: intstr.FromInt(endpointHttpPort)},
{Port: clusterUdpPort, Name: "udp", Protocol: "UDP", NodePort: nodeUdpPort, TargetPort: intstr.FromInt(endpointUdpPort)},
{Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, NodePort: nodeHttpPort, TargetPort: intstr.FromInt(endpointHttpPort)},
{Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, NodePort: nodeUdpPort, TargetPort: intstr.FromInt(endpointUdpPort)},
},
Selector: selector,
},
Expand Down Expand Up @@ -397,9 +387,26 @@ func (config *KubeProxyTestConfig) waitForLoadBalancerIngressSetup() {
config.loadBalancerService, _ = config.getServiceClient().Get(loadBalancerServiceName)
}

func (config *KubeProxyTestConfig) createTestPod() {
func (config *KubeProxyTestConfig) createTestPods() {
testContainerPod := config.createTestPodSpec()
config.testContainerPod = config.createPod(testContainerPod)
hostTestContainerPod := NewHostExecPodSpec(config.f.Namespace.Name, hostTestPodName)

config.createPod(testContainerPod)
config.createPod(hostTestContainerPod)

expectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
expectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))

var err error
config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name)
if err != nil {
Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
}

config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name)
if err != nil {
Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
}
}

func (config *KubeProxyTestConfig) createService(serviceSpec *api.Service) *api.Service {
Expand All @@ -422,13 +429,16 @@ func (config *KubeProxyTestConfig) setup() {
selectorName: "true",
}

By("Getting ssh-able hosts")
hosts, err := NodeSSHHosts(config.f.Client)
Expect(err).NotTo(HaveOccurred())
config.nodes = make([]string, 0, len(hosts))
for _, h := range hosts {
config.nodes = append(config.nodes, strings.TrimSuffix(h, ":22"))
By("Getting two nodes")
nodeList, err := config.f.Client.Nodes().List(labels.Everything(), fields.Everything())
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get node list: %v", err))
config.externalAddrs = NodeAddresses(nodeList, api.NodeExternalIP)
if len(config.externalAddrs) < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming that this is what allows the test to run on k8s-mesos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ssh only worked on gce, gke, aws.

// fall back to legacy IPs
config.externalAddrs = NodeAddresses(nodeList, api.NodeLegacyHostIP)
}
Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP"))
config.nodes = nodeList.Items

if enableLoadBalancerTest {
By("Creating the LoadBalancer Service on top of the pods in kubernetes")
Expand All @@ -437,13 +447,13 @@ func (config *KubeProxyTestConfig) setup() {

By("Creating the service pods in kubernetes")
podName := "netserver"
config.endpointPods = config.createNetProxyPods(podName, serviceSelector, testContext.CloudConfig.NumNodes)
config.endpointPods = config.createNetProxyPods(podName, serviceSelector)

By("Creating the service on top of the pods in kubernetes")
config.createNodePortService(serviceSelector)

By("Creating test pods")
config.createTestPod()
config.createTestPods()
}

func (config *KubeProxyTestConfig) cleanup() {
Expand All @@ -458,23 +468,35 @@ func (config *KubeProxyTestConfig) cleanup() {
}
}

func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector map[string]string, nodeCount int) []*api.Pod {
//testContext.CloudConfig.NumNodes
pods := make([]*api.Pod, 0)
func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod {
nodes, err := config.f.Client.Nodes().List(labels.Everything(), fields.Everything())
Expect(err).NotTo(HaveOccurred())

for i := 0; i < nodeCount; i++ {
// create pods, one for each node
createdPods := make([]*api.Pod, 0, len(nodes.Items))
for i, n := range nodes.Items {
podName := fmt.Sprintf("%s-%d", podName, i)
pod := config.createNetShellPodSpec(podName)
pod := config.createNetShellPodSpec(podName, n.Name)
pod.ObjectMeta.Labels = selector
createdPod := config.createPod(pod)
pods = append(pods, createdPod)
createdPods = append(createdPods, createdPod)
}
return pods

// wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes.Items))
for _, p := range createdPods {
expectNoError(config.f.WaitForPodRunning(p.Name))
rp, err := config.getPodClient().Get(p.Name)
expectNoError(err)
runningPods = append(runningPods, rp)
}

return runningPods
}

func (config *KubeProxyTestConfig) deleteNetProxyPod() {
pod := config.endpointPods[0]
config.getPodClient().Delete(pod.Name, nil)
config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the force-delete needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? It slows down tests without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no specific reason as to "why not" -- just wondering why the change was made at all

config.endpointPods = config.endpointPods[1:]
// wait for pod being deleted.
err := waitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, util.ForeverTestTimeout)
Expand All @@ -495,11 +517,6 @@ func (config *KubeProxyTestConfig) createPod(pod *api.Pod) *api.Pod {
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an if statement, vs Expect(err).NotTo(HaveOccurred())? just wondering..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this style is used in the whole file.

Failf("Failed to create %s pod: %v", pod.Name, err)
}
expectNoError(config.f.WaitForPodRunning(pod.Name))
createdPod, err = config.getPodClient().Get(pod.Name)
if err != nil {
Failf("Failed to retrieve %s pod: %v", pod.Name, err)
}
return createdPod
}

Expand Down
25 changes: 6 additions & 19 deletions test/e2e/privileged.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
type PrivilegedPodTestConfig struct {
privilegedPod *api.Pod
f *Framework
nodes []string
hostExecPod *api.Pod
}

var _ = Describe("PrivilegedPod", func() {
Expand All @@ -52,15 +52,10 @@ var _ = Describe("PrivilegedPod", func() {
f: f,
}
It("should test privileged pod", func() {
SkipUnlessProviderIs(providersWithSSH...)

By("Getting ssh-able hosts")
hosts, err := NodeSSHHosts(config.f.Client)
Expect(err).NotTo(HaveOccurred())
if len(hosts) == 0 {
Failf("No ssh-able nodes")
}
config.nodes = hosts
hostExecPod := NewHostExecPodSpec(f.Namespace.Name, "hostexec")
pod, err := config.getPodClient().Create(hostExecPod)
expectNoError(err)
config.hostExecPod = pod

By("Creating a privileged pod")
config.createPrivilegedPod()
Expand Down Expand Up @@ -96,8 +91,7 @@ func (config *PrivilegedPodTestConfig) dialFromContainer(containerIP string, con
v.Encode())

By(fmt.Sprintf("Exec-ing into container over http. Running command:%s", cmd))
stdout := config.ssh(cmd)
Logf("Output is %q", stdout)
stdout := RunHostCmdOrDie(config.hostExecPod.Namespace, config.hostExecPod.Name, cmd)
var output map[string]string
err := json.Unmarshal([]byte(stdout), &output)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Could not unmarshal curl response: %s", stdout))
Expand Down Expand Up @@ -172,10 +166,3 @@ func (config *PrivilegedPodTestConfig) getPodClient() client.PodInterface {
func (config *PrivilegedPodTestConfig) getNamespaceClient() client.NamespaceInterface {
return config.f.Client.Namespaces()
}

func (config *PrivilegedPodTestConfig) ssh(cmd string) string {
stdout, _, code, err := SSH(cmd, config.nodes[0], testContext.Provider)
Expect(err).NotTo(HaveOccurred(), "error while SSH-ing to node: %v (code %v)", err, code)
Expect(code).Should(BeZero(), "command exited with non-zero code %v. cmd:%s", code, cmd)
return stdout
}
32 changes: 10 additions & 22 deletions test/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,11 @@ var _ = Describe("Services", func() {
ip := pickNodeIP(c)
testReachable(ip, nodePort)

// this test uses NodeSSHHosts that does not work if a Node only reports LegacyHostIP
if providerIs(providersWithSSH...) {
hosts, err := NodeSSHHosts(c)
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
if code != 0 {
Failf("expected node port (%d) to be in use", nodePort)
}
hostExec := LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
cmd := fmt.Sprintf(`ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
stdout, err := RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
if err != nil {
Failf("expected node port (%d) to be in use, stdout: %v", nodePort, stdout)
}
})

Expand Down Expand Up @@ -759,17 +753,11 @@ var _ = Describe("Services", func() {
err = t.DeleteService(serviceName)
Expect(err).NotTo(HaveOccurred())

// this test uses NodeSSHHosts that does not work if a Node only reports LegacyHostIP
if providerIs(providersWithSSH...) {
hosts, err := NodeSSHHosts(c)
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
if code == 0 {
Failf("expected node port (%d) to not be in use", nodePort)
}
hostExec := LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
stdout, err := RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
if err != nil {
Failf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout)
}

By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
Expand Down