diff --git a/examples/pod_portforward.py b/examples/pod_portforward.py new file mode 100644 index 0000000000..438bf57fad --- /dev/null +++ b/examples/pod_portforward.py @@ -0,0 +1,123 @@ +# Copyright 2020 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Shows the functionality of portforward streaming using an nginx container. +""" + +import socket +import time +import urllib.request + +from kubernetes import config +from kubernetes.client import Configuration +from kubernetes.client.api import core_v1_api +from kubernetes.client.rest import ApiException +from kubernetes.stream import portforward + + +def portforward_commands(api_instance): + name = 'portforward-example' + resp = None + try: + resp = api_instance.read_namespaced_pod(name=name, + namespace='default') + except ApiException as e: + if e.status != 404: + print("Unknown error: %s" % e) + exit(1) + + if not resp: + print("Pod %s does not exist. Creating it..." % name) + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name + }, + 'spec': { + 'containers': [{ + 'image': 'nginx', + 'name': 'nginx', + }] + } + } + resp = api_instance.create_namespaced_pod(body=pod_manifest, + namespace='default') + while True: + resp = api_instance.read_namespaced_pod(name=name, + namespace='default') + if resp.status.phase != 'Pending': + break + time.sleep(1) + print("Done.") + + pf = portforward(api_instance.connect_get_namespaced_pod_portforward, + name, 'default', + ports='80,8080:80') + for port in (80, 8080): + http = pf.socket(port) + http.settimeout(1) + http.sendall(b'GET / HTTP/1.1\r\n') + http.sendall(b'Host: 127.0.0.1\r\n') + http.sendall(b'Accept: */*\r\n') + http.sendall(b'\r\n') + response = b'' + while True: + try: + response += http.recv(1024) + except socket.timeout: + break + print(response.decode('utf-8')) + http.close() + + # Monkey patch socket.create_connection which is used by http.client and + # urllib.request. The same can be done with urllib3.util.connection.create_connection + # if the "requests" package is used. + def kubernetes_create_connection(address, *args, **kwargs): + dns_name = address[0] + if isinstance(dns_name, bytes): + dns_name = dns_name.decode() + # Look for "..kubernetes" dns names and if found + # provide a socket that is port forwarded to the kuberntest pod. + dns_name = dns_name.split(".") + if len(dns_name) != 3 or dns_name[2] != "kubernetes": + return socket_create_connection(address, *args, **kwargs) + pf = portforward(api_instance.connect_get_namespaced_pod_portforward, + dns_name[0], dns_name[1], ports=str(address[1])) + return pf.socket(address[1]) + + socket_create_connection = socket.create_connection + socket.create_connection = kubernetes_create_connection + + # Access the nginx http server using the "..kubernetes" dns name. + response = urllib.request.urlopen('http://%s.default.kubernetes' % name) + html = response.read().decode('utf-8') + response.close() + print('Status:', response.status) + print(html) + + +def main(): + config.load_kube_config() + c = Configuration() + c.assert_hostname = False + #Configuration.set_default(c) + core_v1 = core_v1_api.CoreV1Api() + + portforward_commands(core_v1) + + +if __name__ == '__main__': + main() diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index 5fd1b5e64b..480e6928a5 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -13,14 +13,16 @@ # under the License. import json +import socket import time import unittest +import urllib.request import uuid from kubernetes.client import api_client from kubernetes.client.api import core_v1_api from kubernetes.e2e_test import base -from kubernetes.stream import stream +from kubernetes.stream import stream, portforward from kubernetes.stream.ws_client import ERROR_CHANNEL @@ -119,6 +121,7 @@ def test_pod_apis(self): resp = api.delete_namespaced_pod(name=name, body={}, namespace='default') + def test_exit_code(self): client = api_client.ApiClient(configuration=self.config) api = core_v1_api.CoreV1Api(client) @@ -159,6 +162,121 @@ def test_exit_code(self): resp = api.delete_namespaced_pod(name=name, body={}, namespace='default') + def test_portforward_raw(self): + client = api_client.ApiClient(configuration=self.config) + api = core_v1_api.CoreV1Api(client) + + name = 'portforward-raw-' + short_uuid() + pod_manifest = manifest_with_command(name, "while true;do nc -l -p 1234 -e /bin/cat; done") + resp = api.create_namespaced_pod(body=pod_manifest, + namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + + while True: + resp = api.read_namespaced_pod(name=name, + namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + if resp.status.phase != 'Pending': + break + time.sleep(1) + + pf1234 = portforward(api.connect_get_namespaced_pod_portforward, + name, 'default', + ports='1234') + sock1234 = pf1234.socket(1234) + sock1234.settimeout(1) + sent1234 = b'Test port 1234 forwarding...' + sock1234.sendall(sent1234) + reply1234 = b'' + while True: + try: + reply1234 += sock1234.recv(1024) + except socket.timeout: + break + sock1234.close() + self.assertEqual(reply1234, sent1234) + self.assertIsNone(pf1234.error(1234)) + + pf9999 = portforward(api.connect_get_namespaced_pod_portforward, + name, 'default', + ports='9999:1234') + sock9999 = pf9999.socket(9999) + sock9999.settimeout(1) + sent9999 = b'Test port 9999 forwarding...' + sock9999.sendall(sent9999) + reply9999 = b'' + while True: + try: + reply9999 += sock9999.recv(1024) + except socket.timeout: + break + self.assertEqual(reply9999, sent9999) + sock9999.close() + self.assertIsNone(pf9999.error(9999)) + + resp = api.delete_namespaced_pod(name=name, body={}, + namespace='default') + + def test_portforward_http(self): + client = api_client.ApiClient(configuration=self.config) + api = core_v1_api.CoreV1Api(client) + + name = 'portforward-http-' + short_uuid() + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name + }, + 'spec': { + 'containers': [{ + 'name': 'nginx', + 'image': 'nginx', + }] + } + } + + resp = api.create_namespaced_pod(body=pod_manifest, + namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + + while True: + resp = api.read_namespaced_pod(name=name, + namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + if resp.status.phase != 'Pending': + break + time.sleep(1) + + def kubernetes_create_connection(address, *args, **kwargs): + dns_name = address[0] + if isinstance(dns_name, bytes): + dns_name = dns_name.decode() + dns_name = dns_name.split(".") + if len(dns_name) != 3 or dns_name[2] != "kubernetes": + return socket_create_connection(address, *args, **kwargs) + pf = portforward(api.connect_get_namespaced_pod_portforward, + dns_name[0], dns_name[1], ports=str(address[1])) + return pf.socket(address[1]) + + socket_create_connection = socket.create_connection + try: + socket.create_connection = kubernetes_create_connection + response = urllib.request.urlopen('http://%s.default.kubernetes/' % name) + html = response.read().decode('utf-8') + finally: + socket.create_connection = socket_create_connection + + self.assertEqual(response.status, 200) + self.assertTrue('

Welcome to nginx!

' in html) + + resp = api.delete_namespaced_pod(name=name, body={}, + namespace='default') + def test_service_apis(self): client = api_client.ApiClient(configuration=self.config) api = core_v1_api.CoreV1Api(client)