Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unittests for portforwarding ability added in python-base. #1237

Merged
merged 7 commits into from
Sep 9, 2020
194 changes: 194 additions & 0 deletions examples/pod_portforward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright 2020 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Shows the functionality of portforward streaming using an nginx container.
"""

import select
import socket
import time
import urllib.request

from kubernetes import config
from kubernetes.client import Configuration
from kubernetes.client.api import core_v1_api
from kubernetes.client.rest import ApiException
from kubernetes.stream import portforward

##############################################################################
# Kubernetes pod port forwarding works by directly providing a socket which
# the python application uses to send and receive data on. This is in contrast
# to the go client, which opens a local port that the go application then has
# to open to get a socket to transmit data.
#
# This simplifies the python application, there is not local port to worry
# about if that port number is available. Nor does the python application have
# to then deal with opening this local port. The socket used to transmit data
# is immediately provided to the python application.
#
# Below also is an example of monkey patching the socket.create_connection
# function so that DNS names of the following formats will access kubernetes
# ports:
#
# <pod-name>.<namespace>.kubernetes
# <pod-name>.pod.<namespace>.kubernetes
# <service-name>.svc.<namespace>.kubernetes
# <service-name>.service.<namespace>.kubernetes
#
# These DNS name can be used to interact with pod ports using python libraries,
# such as urllib.request and http.client. For example:
#
# response = urllib.request.urlopen(
# 'https://metrics-server.service.kube-system.kubernetes/'
# )
#
##############################################################################


def portforward_commands(api_instance):
name = 'portforward-example'
resp = None
try:
resp = api_instance.read_namespaced_pod(name=name,
namespace='default')
except ApiException as e:
if e.status != 404:
print("Unknown error: %s" % e)
exit(1)

if not resp:
print("Pod %s does not exist. Creating it..." % name)
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'image': 'nginx',
'name': 'nginx',
}]
}
}
api_instance.create_namespaced_pod(body=pod_manifest,
namespace='default')
while True:
resp = api_instance.read_namespaced_pod(name=name,
namespace='default')
if resp.status.phase != 'Pending':
break
time.sleep(1)
print("Done.")

pf = portforward(
api_instance.connect_get_namespaced_pod_portforward,
name, 'default',
ports='80',
)
http = pf.socket(80)
http.setblocking(True)
http.sendall(b'GET / HTTP/1.1\r\n')
http.sendall(b'Host: 127.0.0.1\r\n')
http.sendall(b'Connection: close\r\n')
http.sendall(b'Accept: */*\r\n')
http.sendall(b'\r\n')
response = b''
while True:
select.select([http], [], [])
data = http.recv(1024)
if not data:
break
response += data
http.close()
print(response.decode('utf-8'))
error = pf.error(80)
if error is None:
print("No port forward errors on port 80.")
else:
print("Port 80 has the following error: %s" % error)

# Monkey patch socket.create_connection which is used by http.client and
# urllib.request. The same can be done with urllib3.util.connection.create_connection
# if the "requests" package is used.
socket_create_connection = socket.create_connection
def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
dns_name = dns_name.split(".")
if dns_name[-1] != 'kubernetes':
return socket_create_connection(address, *args, **kwargs)
if len(dns_name) not in (3, 4):
raise RuntimeError("Unexpected kubernetes DNS name.")
namespace = dns_name[-2]
name = dns_name[0]
port = address[1]
if len(dns_name) == 4:
if dns_name[1] in ('svc', 'service'):
service = api_instance.read_namespaced_service(name, namespace)
for service_port in service.spec.ports:
if service_port.port == port:
port = service_port.target_port
break
else:
raise RuntimeError("Unable to find service port: %s" % port)
label_selector = []
for key, value in service.spec.selector.items():
label_selector.append("%s=%s" % (key, value))
pods = api_instance.list_namespaced_pod(
namespace, label_selector=",".join(label_selector)
)
if not pods.items:
raise RuntimeError("Unable to find service pods.")
name = pods.items[0].metadata.name
if isinstance(port, str):
for container in pods.items[0].spec.containers:
for container_port in container.ports:
if container_port.name == port:
port = container_port.container_port
break
else:
continue
break
else:
raise RuntimeError("Unable to find service port name: %s" % port)
elif dns_name[1] != 'pod':
raise RuntimeError("Unsupported resource type: %s" % dns_name[1])
pf = portforward(api_instance.connect_get_namespaced_pod_portforward,
name, namespace, ports=str(port))
return pf.socket(port)
socket.create_connection = kubernetes_create_connection

# Access the nginx http server using the "<pod-name>.pod.<namespace>.kubernetes" dns name.
response = urllib.request.urlopen('http://%s.pod.default.kubernetes' % name)
html = response.read().decode('utf-8')
response.close()
print('Status:', response.status)
print(html)


def main():
config.load_kube_config()
c = Configuration.get_default_copy()
c.assert_hostname = False
Configuration.set_default(c)
core_v1 = core_v1_api.CoreV1Api()

portforward_commands(core_v1)


if __name__ == '__main__':
main()
146 changes: 145 additions & 1 deletion kubernetes/e2e_test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
# under the License.

import json
import select
import socket
import time
import unittest
import urllib.request
import uuid

from kubernetes.client import api_client
from kubernetes.client.api import core_v1_api
from kubernetes.e2e_test import base
from kubernetes.stream import stream
from kubernetes.stream import stream, portforward
from kubernetes.stream.ws_client import ERROR_CHANNEL


Expand Down Expand Up @@ -119,6 +122,7 @@ def test_pod_apis(self):

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_exit_code(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)
Expand Down Expand Up @@ -159,6 +163,146 @@ def test_exit_code(self):
resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_portforward_raw(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)

name = 'portforward-raw-' + short_uuid()
pod_manifest = manifest_with_command(
name,
'for port in 1234 1235;do ((while true;do nc -l -p $port -e /bin/cat; done)&);done;sleep 60',
)
resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)

while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
if resp.status.phase != 'Pending':
break
time.sleep(1)

pf = portforward(api.connect_get_namespaced_pod_portforward,
name, 'default',
ports='1234,1235')
sock1234 = pf.socket(1234)
sock1235 = pf.socket(1235)
sock1234.setblocking(True)
sock1235.setblocking(True)
sent1234 = b'Test port 1234 forwarding...'
sent1235 = b'Test port 1235 forwarding...'
sock1234.sendall(sent1234)
sock1235.sendall(sent1235)
reply1234 = b''
reply1235 = b''
while True:
rlist = []
if sock1234.fileno() != -1:
rlist.append(sock1234)
if sock1235.fileno() != -1:
rlist.append(sock1235)
if not rlist:
break
r, _w, _x = select.select(rlist, [], [], 1)
if not r:
break
if sock1234 in r:
data = sock1234.recv(1024)
if data:
reply1234 += data
else:
assert False, 'Unexpected sock1234 close'
if sock1235 in r:
data = sock1235.recv(1024)
if data:
reply1235 += data
else:
assert False, 'Unexpected sock1235 close'
self.assertEqual(reply1234, sent1234)
self.assertEqual(reply1235, sent1235)
for sock in (sock1234, sock1235):
sent = b'Another test using fileno %s' % str(sock.fileno()).encode()
sock.sendall(sent)
reply = b''
while True:
r, _w, _x = select.select([sock], [], [], 1)
if not r:
break
data = sock.recv(1024)
if data:
reply += data
else:
assert False, 'Unexpected sock close'
self.assertEqual(reply, sent)
sock.close()
self.assertIsNone(pf.error(1234))
self.assertIsNone(pf.error(1235))
yliaog marked this conversation as resolved.
Show resolved Hide resolved

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_portforward_http(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)

name = 'portforward-http-' + short_uuid()
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name
},
'spec': {
'containers': [{
'name': 'nginx',
'image': 'nginx',
}]
}
}

resp = api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)

while True:
resp = api.read_namespaced_pod(name=name,
namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status.phase)
if resp.status.phase != 'Pending':
break
time.sleep(1)

def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
dns_name = dns_name.split(".")
if len(dns_name) != 3 or dns_name[2] != "kubernetes":
return socket_create_connection(address, *args, **kwargs)
pf = portforward(api.connect_get_namespaced_pod_portforward,
dns_name[0], dns_name[1], ports=str(address[1]))
return pf.socket(address[1])

socket_create_connection = socket.create_connection
try:
socket.create_connection = kubernetes_create_connection
response = urllib.request.urlopen('http://%s.default.kubernetes/' % name)
html = response.read().decode('utf-8')
finally:
socket.create_connection = socket_create_connection

self.assertEqual(response.status, 200)
self.assertTrue('<h1>Welcome to nginx!</h1>' in html)

resp = api.delete_namespaced_pod(name=name, body={},
namespace='default')

def test_service_apis(self):
client = api_client.ApiClient(configuration=self.config)
api = core_v1_api.CoreV1Api(client)
Expand Down