Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unittests for portforwarding ability added in python-base. #1237

Merged
merged 7 commits into from
Sep 9, 2020
123 changes: 123 additions & 0 deletions examples/pod_portforward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2020 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Shows the functionality of portforward streaming using an nginx container.
"""

import socket
import time
import urllib.request

from kubernetes import config
from kubernetes.client import Configuration
from kubernetes.client.api import core_v1_api
from kubernetes.client.rest import ApiException
from kubernetes.stream import portforward


def portforward_commands(api_instance):
name = 'portforward-example'
resp = None
try:
resp = api_instance.read_namespaced_pod(name=name,
namespace='default')
except ApiException as e:
if e.status != 404:
print("Unknown error: %s" % e)
exit(1)

if not resp:
print("Pod %s does not exist. Creating it..." % name)
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'image': 'nginx',
'name': 'nginx',
}]
}
}
resp = api_instance.create_namespaced_pod(body=pod_manifest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp is not checked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied and pasted from the pod_exec.py example, where it also does not check it. If there is an error, an exception would be thrown. I removed the assignment in this example.

namespace='default')
while True:
resp = api_instance.read_namespaced_pod(name=name,
namespace='default')
if resp.status.phase != 'Pending':
break
time.sleep(1)
print("Done.")

pf = portforward(api_instance.connect_get_namespaced_pod_portforward,
name, 'default',
ports='80,8080:80')
for port in (80, 8080):
http = pf.socket(port)
http.settimeout(1)
http.sendall(b'GET / HTTP/1.1\r\n')
http.sendall(b'Host: 127.0.0.1\r\n')
http.sendall(b'Accept: */*\r\n')
http.sendall(b'\r\n')
response = b''
while True:
try:
response += http.recv(1024)
except socket.timeout:
break
print(response.decode('utf-8'))
http.close()

# Monkey patch socket.create_connection which is used by http.client and
# urllib.request. The same can be done with urllib3.util.connection.create_connection
# if the "requests" package is used.
def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
# Look for "<pod-name>.<namspace>.kubernetes" dns names and if found
# provide a socket that is port forwarded to the kuberntest pod.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/kuberntest/kubernetes/

dns_name = dns_name.split(".")
if len(dns_name) != 3 or dns_name[2] != "kubernetes":
return socket_create_connection(address, *args, **kwargs)
pf = portforward(api_instance.connect_get_namespaced_pod_portforward,
dns_name[0], dns_name[1], ports=str(address[1]))
return pf.socket(address[1])

socket_create_connection = socket.create_connection
socket.create_connection = kubernetes_create_connection

# Access the nginx http server using the "<pod-name>.<namespace>.kubernetes" dns name.
response = urllib.request.urlopen('http://%s.default.kubernetes' % name)
html = response.read().decode('utf-8')
response.close()
print('Status:', response.status)
print(html)


def main():
config.load_kube_config()
c = Configuration()
c.assert_hostname = False
#Configuration.set_default(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of commenting it out, the following should work:

c = Configuration.get_default_copy()
c.assert_hostname = False
Configuration.set_default(c)

core_v1 = core_v1_api.CoreV1Api()

portforward_commands(core_v1)


if __name__ == '__main__':
main()
120 changes: 119 additions & 1 deletion kubernetes/e2e_test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
# under the License.

import json
import socket
import time
import unittest
import urllib.request
import uuid

from kubernetes.client import api_client
from kubernetes.client.api import core_v1_api
from kubernetes.e2e_test import base
from kubernetes.stream import stream
from kubernetes.stream import stream, portforward
from kubernetes.stream.ws_client import ERROR_CHANNEL


Expand Down Expand Up @@ -119,6 +121,7 @@ def test_pod_apis(self):

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_exit_code(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)
Expand Down Expand Up @@ -159,6 +162,121 @@ def test_exit_code(self):
resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_portforward_raw(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)

name = 'portforward-raw-' + short_uuid()
pod_manifest = manifest_with_command(name, "while true;do nc -l -p 1234 -e /bin/cat; done")
resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)

while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
if resp.status.phase != 'Pending':
break
time.sleep(1)

pf1234 = portforward(api.connect_get_namespaced_pod_portforward,
name, 'default',
ports='1234')
sock1234 = pf1234.socket(1234)
sock1234.settimeout(1)
sent1234 = b'Test port 1234 forwarding...'
sock1234.sendall(sent1234)
reply1234 = b''
while True:
try:
reply1234 += sock1234.recv(1024)
except socket.timeout:
break
sock1234.close()
self.assertEqual(reply1234, sent1234)
self.assertIsNone(pf1234.error(1234))

pf9999 = portforward(api.connect_get_namespaced_pod_portforward,
name, 'default',
ports='9999:1234')
sock9999 = pf9999.socket(9999)
sock9999.settimeout(1)
sent9999 = b'Test port 9999 forwarding...'
sock9999.sendall(sent9999)
reply9999 = b''
while True:
try:
reply9999 += sock9999.recv(1024)
except socket.timeout:
break
self.assertEqual(reply9999, sent9999)
sock9999.close()
self.assertIsNone(pf9999.error(9999))

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_portforward_http(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)

name = 'portforward-http-' + short_uuid()
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'name': 'nginx',
'image': 'nginx',
}]
}
}

resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)

while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
if resp.status.phase != 'Pending':
break
time.sleep(1)

def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
dns_name = dns_name.split(".")
if len(dns_name) != 3 or dns_name[2] != "kubernetes":
return socket_create_connection(address, *args, **kwargs)
pf = portforward(api.connect_get_namespaced_pod_portforward,
dns_name[0], dns_name[1], ports=str(address[1]))
return pf.socket(address[1])

socket_create_connection = socket.create_connection
try:
socket.create_connection = kubernetes_create_connection
response = urllib.request.urlopen('http://%s.default.kubernetes/' % name)
html = response.read().decode('utf-8')
finally:
socket.create_connection = socket_create_connection

self.assertEqual(response.status, 200)
self.assertTrue('<h1>Welcome to nginx!</h1>' in html)

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_service_apis(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)
Expand Down