From 97936e3ab689e37b1319b9dd2aab0607ade641a8 Mon Sep 17 00:00:00 2001 From: Patrick J McNerthney Date: Fri, 29 Apr 2022 20:36:06 -1000 Subject: [PATCH 1/4] It turns out SSL sockets can buffer data such that the select method is not aware of it. See: https://docs.python.org/3/library/ssl.html#notes-on-non-blocking-sockets https://docs.python.org/3/library/ssl.html#ssl.SSLSocket.pending --- kubernetes/base/stream/ws_client.py | 93 +++++++++--------- kubernetes/e2e_test/port_server.py | 4 +- kubernetes/e2e_test/test_client.py | 142 ++++++++++++++-------------- 3 files changed, 122 insertions(+), 117 deletions(-) diff --git a/kubernetes/base/stream/ws_client.py b/kubernetes/base/stream/ws_client.py index 4d7b8c5c26..3f4c1225a2 100644 --- a/kubernetes/base/stream/ws_client.py +++ b/kubernetes/base/stream/ws_client.py @@ -353,69 +353,76 @@ def _proxy(self): local_all_closed = True for port in self.local_ports.values(): if port.python.fileno() != -1: - if port.error or not self.websocket.connected: + if self.websocket.connected: + rlist.append(port.python) if port.data: wlist.append(port.python) - local_all_closed = False - else: - port.python.close() + local_all_closed = False else: - rlist.append(port.python) if port.data: wlist.append(port.python) - local_all_closed = False + local_all_closed = False + else: + port.python.close() if local_all_closed and not (self.websocket.connected and kubernetes_data): self.websocket.close() return r, w, _ = select.select(rlist, wlist, []) for sock in r: if sock == self.websocket: - opcode, frame = self.websocket.recv_data_frame(True) - if opcode == ABNF.OPCODE_BINARY: - if not frame.data: - raise RuntimeError("Unexpected frame data size") - channel = six.byte2int(frame.data) - if channel >= len(channel_ports): - raise RuntimeError("Unexpected channel number: %s" % channel) - port = channel_ports[channel] - if channel_initialized[channel]: - if channel % 2: - if port.error is None: - port.error = '' - port.error += frame.data[1:].decode() + pending = True + while pending: + opcode, frame = self.websocket.recv_data_frame(True) + if opcode == ABNF.OPCODE_BINARY: + if not frame.data: + raise RuntimeError("Unexpected frame data size") + channel = six.byte2int(frame.data) + if channel >= len(channel_ports): + raise RuntimeError("Unexpected channel number: %s" % channel) + port = channel_ports[channel] + if channel_initialized[channel]: + if channel % 2: + if port.error is None: + port.error = '' + port.error += frame.data[1:].decode() + port.python.close() + else: + port.data += frame.data[1:] else: - port.data += frame.data[1:] - else: - if len(frame.data) != 3: - raise RuntimeError( - "Unexpected initial channel frame data size" - ) - port_number = six.byte2int(frame.data[1:2]) + (six.byte2int(frame.data[2:3]) * 256) - if port_number != port.port_number: - raise RuntimeError( - "Unexpected port number in initial channel frame: %s" % port_number - ) - channel_initialized[channel] = True - elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG, ABNF.OPCODE_CLOSE): - raise RuntimeError("Unexpected websocket opcode: %s" % opcode) + if len(frame.data) != 3: + raise RuntimeError( + "Unexpected initial channel frame data size" + ) + port_number = six.byte2int(frame.data[1:2]) + (six.byte2int(frame.data[2:3]) * 256) + if port_number != port.port_number: + raise RuntimeError( + "Unexpected port number in initial channel frame: %s" % port_number + ) + channel_initialized[channel] = True + elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG, ABNF.OPCODE_CLOSE): + raise RuntimeError("Unexpected websocket opcode: %s" % opcode) + if not (isinstance(self.websocket.sock, ssl.SSLSocket) and self.websocket.sock.pending()): + pending = False else: port = local_ports[sock] - data = port.python.recv(1024 * 1024) - if data: - kubernetes_data += ABNF.create_frame( - port.channel + data, - ABNF.OPCODE_BINARY, - ).format() - else: - port.python.close() + if port.python.fileno() != -1: + data = port.python.recv(1024 * 1024) + if data: + kubernetes_data += ABNF.create_frame( + port.channel + data, + ABNF.OPCODE_BINARY, + ).format() + else: + port.python.close() for sock in w: if sock == self.websocket: sent = self.websocket.sock.send(kubernetes_data) kubernetes_data = kubernetes_data[sent:] else: port = local_ports[sock] - sent = port.python.send(port.data) - port.data = port.data[sent:] + if port.python.fileno() != -1: + sent = port.python.send(port.data) + port.data = port.data[sent:] def get_websocket_url(url, query_params=None): diff --git a/kubernetes/e2e_test/port_server.py b/kubernetes/e2e_test/port_server.py index 75d28528be..2fb5f0ccf5 100644 --- a/kubernetes/e2e_test/port_server.py +++ b/kubernetes/e2e_test/port_server.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - import select import socketserver import sys @@ -28,6 +26,7 @@ def handler(self, request, address, server): data = request.recv(1024) if not data: break + print(f"{self.port}: {data}\n", end='', flush=True) echo += data if w: echo = echo[request.send(echo):] @@ -38,4 +37,3 @@ def handler(self, request, address, server): for port in sys.argv[1:]: ports.append(PortServer(int(port))) time.sleep(10 * 60) - diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index 2f3337ba9b..7a75bbde9f 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -230,10 +230,6 @@ def test_exit_code(self): resp = api.delete_namespaced_pod(name=name, body={}, namespace='default') - # Skipping this test as this flakes a lot - # See: https://github.com/kubernetes-client/python/issues/1300 - # Re-enable the test once the flakiness is investigated - @unittest.skip("skipping due to extreme flakiness") def test_portforward_raw(self): client = api_client.ApiClient(configuration=self.config) api = core_v1_api.CoreV1Api(client) @@ -267,7 +263,7 @@ def test_portforward_raw(self): 'name': 'port-server', 'image': 'python', 'command': [ - '/opt/port-server.py', '1234', '1235', + 'python', '-u', '/opt/port-server.py', '1234', '1235', ], 'volumeMounts': [ { @@ -278,17 +274,19 @@ def test_portforward_raw(self): ], 'startupProbe': { 'tcpSocket': { - 'port': 1234, + 'port': 1235, }, + 'periodSeconds': 1, + 'failureThreshold': 30, }, }, ], + 'restartPolicy': 'Never', 'volumes': [ { 'name': 'port-server', 'configMap': { 'name': name, - 'defaultMode': 0o777, }, }, ], @@ -299,77 +297,79 @@ def test_portforward_raw(self): self.assertEqual(name, resp.metadata.name) self.assertTrue(resp.status.phase) + timeout = time.time() + 60 while True: resp = api.read_namespaced_pod(name=name, namespace='default') self.assertEqual(name, resp.metadata.name) - self.assertTrue(resp.status.phase) - if resp.status.phase != 'Pending': - break + if resp.status.phase == 'Running': + if resp.status.container_statuses[0].ready: + break + else: + self.assertEqual(resp.status.phase, 'Pending') + self.assertTrue(time.time() < timeout) time.sleep(1) - self.assertEqual(resp.status.phase, 'Running') - - pf = portforward(api.connect_get_namespaced_pod_portforward, - name, 'default', - ports='1234,1235,1236') - self.assertTrue(pf.connected) - sock1234 = pf.socket(1234) - sock1235 = pf.socket(1235) - sock1234.setblocking(True) - sock1235.setblocking(True) - sent1234 = b'Test port 1234 forwarding...' - sent1235 = b'Test port 1235 forwarding...' - sock1234.sendall(sent1234) - sock1235.sendall(sent1235) - reply1234 = b'' - reply1235 = b'' - while True: - rlist = [] - if sock1234.fileno() != -1: - rlist.append(sock1234) - if sock1235.fileno() != -1: - rlist.append(sock1235) - if not rlist: - break - r, _w, _x = select.select(rlist, [], [], 1) - if not r: - break - if sock1234 in r: - data = sock1234.recv(1024) - self.assertNotEqual(data, b'', "Unexpected socket close") - reply1234 += data - if sock1235 in r: - data = sock1235.recv(1024) - self.assertNotEqual(data, b'', "Unexpected socket close") - reply1235 += data - self.assertEqual(reply1234, sent1234) - self.assertEqual(reply1235, sent1235) - self.assertTrue(pf.connected) - - sock = pf.socket(1236) - self.assertRaises(socket.error, sock.sendall, b'This should fail...') - self.assertIsNotNone(pf.error(1236)) - sock.close() - - for sock in (sock1234, sock1235): + + for ix in range(10): + ix = str(ix + 1).encode() + pf = portforward(api.connect_get_namespaced_pod_portforward, + name, 'default', + ports='1234,1235,1236') self.assertTrue(pf.connected) - sent = b'Another test using fileno %s' % str( - sock.fileno()).encode() - sock.sendall(sent) - reply = b'' - while True: - r, _w, _x = select.select([sock], [], [], 1) - if not r: - break - data = sock.recv(1024) - self.assertNotEqual(data, b'', "Unexpected socket close") - reply += data - self.assertEqual(reply, sent) + sock1234 = pf.socket(1234) + sock1235 = pf.socket(1235) + sock1234.setblocking(True) + sock1235.setblocking(True) + sent1234 = b'Test ' + ix + b' port 1234 forwarding' + sent1235 = b'Test ' + ix + b' port 1235 forwarding' + sock1234.sendall(sent1234) + sock1235.sendall(sent1235) + reply1234 = b'' + reply1235 = b'' + timeout = time.time() + 60 + while reply1234 != sent1234 or reply1235 != sent1235: + self.assertNotEqual(sock1234.fileno(), -1) + self.assertNotEqual(sock1235.fileno(), -1) + self.assertTrue(time.time() < timeout) + r, _w, _x = select.select([sock1234, sock1235], [], [], 1) + if sock1234 in r: + data = sock1234.recv(1024) + self.assertNotEqual(data, b'', 'Unexpected socket close') + reply1234 += data + self.assertTrue(sent1234.startswith(reply1234)) + if sock1235 in r: + data = sock1235.recv(1024) + self.assertNotEqual(data, b'', 'Unexpected socket close') + reply1235 += data + self.assertTrue(sent1235.startswith(reply1235)) + self.assertTrue(pf.connected) + + sock = pf.socket(1236) + sock.setblocking(True) + self.assertEqual(sock.recv(1024), b'') + self.assertIsNotNone(pf.error(1236)) sock.close() - time.sleep(1) - self.assertFalse(pf.connected) - self.assertIsNone(pf.error(1234)) - self.assertIsNone(pf.error(1235)) + + for sock in (sock1234, sock1235): + self.assertTrue(pf.connected) + sent = b'Another test ' + ix + b' using fileno ' + str(sock.fileno()).encode() + sock.sendall(sent) + reply = b'' + timeout = time.time() + 60 + while reply != sent: + self.assertNotEqual(sock.fileno(), -1) + self.assertTrue(time.time() < timeout) + r, _w, _x = select.select([sock], [], [], 1) + if r: + data = sock.recv(1024) + self.assertNotEqual(data, b'', 'Unexpected socket close') + reply += data + self.assertTrue(sent.startswith(reply)) + sock.close() + time.sleep(1) + self.assertFalse(pf.connected) + self.assertIsNone(pf.error(1234)) + self.assertIsNone(pf.error(1235)) resp = api.delete_namespaced_pod(name=name, namespace='default') resp = api.delete_namespaced_config_map(name=name, namespace='default') From b681a1019c0c0c2386a232cad42b8773d0cde7b4 Mon Sep 17 00:00:00 2001 From: Yu Liao Date: Mon, 13 Jun 2022 16:49:21 +0000 Subject: [PATCH 2/4] update changelog with release notes from master branch --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90338b7b57..eacd21ad49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# v24.1.0b1 + +Kubernetes API Version: v1.24.1 + +### Uncategorized +- The dynamic client now support the `_request_timeout` parameter to configure connection and request timeouts. (#1732, @philipp-sontag-by) + # v24.1.0a1 Kubernetes API Version: v1.24.1 From 6e8b82b314d66bbf784826955ee04f878610afa7 Mon Sep 17 00:00:00 2001 From: Yu Liao Date: Mon, 13 Jun 2022 16:49:22 +0000 Subject: [PATCH 3/4] update version constants for 24.1.0b1 release --- scripts/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/constants.py b/scripts/constants.py index 17b331f39f..ba5f3c0fb0 100644 --- a/scripts/constants.py +++ b/scripts/constants.py @@ -18,13 +18,13 @@ KUBERNETES_BRANCH = "release-1.24" # client version for packaging and releasing. -CLIENT_VERSION = "24.1.0a1" +CLIENT_VERSION = "24.1.0b1" # Name of the release package PACKAGE_NAME = "kubernetes" # Stage of development, mainly used in setup.py's classifiers. -DEVELOPMENT_STATUS = "3 - Alpha" +DEVELOPMENT_STATUS = "4 - Beta" # If called directly, return the constant value given From b1deb1477d46ce421385dfd4721beca9f37106b5 Mon Sep 17 00:00:00 2001 From: Yu Liao Date: Mon, 13 Jun 2022 16:49:59 +0000 Subject: [PATCH 4/4] generated client change --- kubernetes/README.md | 2 +- kubernetes/__init__.py | 2 +- kubernetes/client/__init__.py | 2 +- kubernetes/client/api_client.py | 2 +- kubernetes/client/configuration.py | 2 +- setup.py | 4 ++-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/kubernetes/README.md b/kubernetes/README.md index 8d0422dc49..8d9123ed4a 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -4,7 +4,7 @@ No description provided (generated by Openapi Generator https://github.com/opena This Python package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project: - API version: release-1.24 -- Package version: 24.1.0a1 +- Package version: 24.1.0b1 - Build package: org.openapitools.codegen.languages.PythonClientCodegen ## Requirements. diff --git a/kubernetes/__init__.py b/kubernetes/__init__.py index c5874350ca..1f4c014b03 100644 --- a/kubernetes/__init__.py +++ b/kubernetes/__init__.py @@ -14,7 +14,7 @@ __project__ = 'kubernetes' # The version is auto-updated. Please do not edit. -__version__ = "24.1.0a1" +__version__ = "24.1.0b1" import kubernetes.client import kubernetes.config diff --git a/kubernetes/client/__init__.py b/kubernetes/client/__init__.py index 32faef6f8c..e5f7713042 100644 --- a/kubernetes/client/__init__.py +++ b/kubernetes/client/__init__.py @@ -14,7 +14,7 @@ from __future__ import absolute_import -__version__ = "24.1.0a1" +__version__ = "24.1.0b1" # import apis into sdk package from kubernetes.client.api.well_known_api import WellKnownApi diff --git a/kubernetes/client/api_client.py b/kubernetes/client/api_client.py index 9bdd451d80..4d12504cf7 100644 --- a/kubernetes/client/api_client.py +++ b/kubernetes/client/api_client.py @@ -78,7 +78,7 @@ def __init__(self, configuration=None, header_name=None, header_value=None, self.default_headers[header_name] = header_value self.cookie = cookie # Set default User-Agent. - self.user_agent = 'OpenAPI-Generator/24.1.0a1/python' + self.user_agent = 'OpenAPI-Generator/24.1.0b1/python' self.client_side_validation = configuration.client_side_validation def __enter__(self): diff --git a/kubernetes/client/configuration.py b/kubernetes/client/configuration.py index 68b0adebb9..eaf4d53649 100644 --- a/kubernetes/client/configuration.py +++ b/kubernetes/client/configuration.py @@ -350,7 +350,7 @@ def to_debug_report(self): "OS: {env}\n"\ "Python Version: {pyversion}\n"\ "Version of the API: release-1.24\n"\ - "SDK Package Version: 24.1.0a1".\ + "SDK Package Version: 24.1.0b1".\ format(env=sys.platform, pyversion=sys.version) def get_host_settings(self): diff --git a/setup.py b/setup.py index 37f0acae68..ee637eb984 100644 --- a/setup.py +++ b/setup.py @@ -16,9 +16,9 @@ # Do not edit these constants. They will be updated automatically # by scripts/update-client.sh. -CLIENT_VERSION = "24.1.0a1" +CLIENT_VERSION = "24.1.0b1" PACKAGE_NAME = "kubernetes" -DEVELOPMENT_STATUS = "3 - Alpha" +DEVELOPMENT_STATUS = "4 - Beta" # To install the library, run the following #