diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 40e5e75bf..1cd637a56 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -16,6 +16,8 @@ import pydoc import sys +from urllib3.exceptions import ProtocolError, ReadTimeoutError + from kubernetes import client PYDOC_RETURN_LABEL = ":return:" @@ -154,6 +156,14 @@ def stream(self, func, *args, **kwargs): :param func: The API function pointer. Any parameter to the function can be passed after this parameter. + :param int _health_check_interval: Optional. Number of seconds to wait + for data before assuming the connection has been silently dropped + (e.g., during a control plane upgrade). When set to a value > 0, + the watch will automatically detect silent connection drops and + reconnect using the last known resource_version. Default is 0 + (disabled). This is useful for long-running watches that need to + survive cluster upgrades. + :return: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. 'raw_object': a dict representing the watched object. @@ -172,6 +182,11 @@ def stream(self, func, *args, **kwargs): ... if should_stop: watch.stop() + + Example with health check (survives control plane upgrades): + for e in watch.stream(v1.list_namespace, _health_check_interval=30): + # If no data received for 30s, watch reconnects automatically + print(e['type'], e['object'].metadata.name) """ self._stop = False @@ -187,6 +202,20 @@ def stream(self, func, *args, **kwargs): disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False deserialize = kwargs.pop('deserialize', True) + + # Health check interval: when > 0, sets a read timeout on the + # HTTP connection so that silent connection drops (e.g., during + # control plane upgrades) are detected and the watch reconnects. + # A value of 0 (default) disables this feature. + health_check_interval = kwargs.pop('_health_check_interval', 0) + + # If health check is enabled and user hasn't set an explicit + # request timeout, use the health check interval as the read + # timeout. This causes urllib3 to raise ReadTimeoutError if no + # data arrives within the interval, which we catch below. + if health_check_interval > 0 and '_request_timeout' not in kwargs: + kwargs['_request_timeout'] = health_check_interval + while True: resp = func(*args, **kwargs) try: @@ -217,12 +246,20 @@ def stream(self, func, *args, **kwargs): retry_after_410 = False yield event else: - if line: + if line: yield line # Normal non-empty line - else: - yield '' # Only yield one empty line + else: + yield '' # Only yield one empty line if self._stop: break + except (ReadTimeoutError, ProtocolError) as e: + # If health check is enabled, treat a read timeout as a + # silent connection drop and allow the outer while loop + # to reconnect using the last known resource_version. + if health_check_interval > 0: + pass # Fall through to retry logic below + else: + raise finally: resp.close() resp.release_conn() diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index d872020b4..16665ac8e 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -17,6 +17,8 @@ import unittest from unittest.mock import Mock, call +from urllib3.exceptions import ReadTimeoutError + from kubernetes import client, config from kubernetes.client import ApiException @@ -626,5 +628,201 @@ def test_pod_log_empty_lines(self): # ]) + def test_health_check_detects_silent_drop_and_reconnects(self): + """Test that _health_check_interval detects a silent connection drop + (simulated by ReadTimeoutError) and reconnects automatically, + continuing to yield events from the new connection.""" + + # First response: yields one event, then raises ReadTimeoutError + # (simulating a silent connection drop during control plane upgrade) + fake_resp_1 = Mock() + fake_resp_1.close = Mock() + fake_resp_1.release_conn = Mock() + + def stream_then_timeout(*args, **kwargs): + yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n' + raise ReadTimeoutError(pool=None, url=None, message="Read timed out") + + fake_resp_1.stream = Mock(side_effect=stream_then_timeout) + + # Second response: yields another event (after reconnect) + fake_resp_2 = Mock() + fake_resp_2.close = Mock() + fake_resp_2.release_conn = Mock() + fake_resp_2.stream = Mock( + return_value=[ + '{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n' + ]) + + fake_api = Mock() + # First call returns the stream that will timeout, + # second call returns the stream with new events + fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2]) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + events = [] + # Note: we do NOT pass timeout_seconds here because that disables + # retries. The watch should reconnect after the ReadTimeoutError + # and then stop naturally when the second stream ends (resource_version + # is set from the first event, so the finally block won't set _stop). + for e in w.stream(fake_api.get_namespaces, + _health_check_interval=5): + events.append(e) + # Stop after collecting events from both connections + if len(events) == 2: + w.stop() + + # Should have received events from both connections + self.assertEqual(2, len(events)) + self.assertEqual("test1", events[0]['object'].metadata.name) + self.assertEqual("test2", events[1]['object'].metadata.name) + + # Verify the API was called twice (initial + reconnect) + self.assertEqual(2, fake_api.get_namespaces.call_count) + + # Verify both responses were properly closed + fake_resp_1.close.assert_called_once() + fake_resp_1.release_conn.assert_called_once() + fake_resp_2.close.assert_called_once() + fake_resp_2.release_conn.assert_called_once() + + def test_health_check_disabled_by_default(self): + """Test that without _health_check_interval, a ReadTimeoutError + propagates to the caller (backward compatibility).""" + + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + + def stream_then_timeout(*args, **kwargs): + yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n' + raise ReadTimeoutError(pool=None, url=None, message="Read timed out") + + fake_resp.stream = Mock(side_effect=stream_then_timeout) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + events = [] + with self.assertRaises(ReadTimeoutError): + for e in w.stream(fake_api.get_namespaces): + events.append(e) + + # Should have received the one event before the timeout + self.assertEqual(1, len(events)) + self.assertEqual("test1", events[0]['object'].metadata.name) + + # Verify the response was properly closed even after exception + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + + def test_health_check_sets_request_timeout(self): + """Test that _health_check_interval sets _request_timeout on the + API call when no explicit _request_timeout is provided.""" + + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock( + return_value=[ + '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n' + ]) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + for e in w.stream(fake_api.get_namespaces, + _health_check_interval=30, + timeout_seconds=10): + pass + + # Verify _request_timeout was set to the health check interval + fake_api.get_namespaces.assert_called_once_with( + _preload_content=False, watch=True, + timeout_seconds=10, _request_timeout=30) + + def test_health_check_preserves_explicit_request_timeout(self): + """Test that _health_check_interval does NOT override an explicit + _request_timeout provided by the user.""" + + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock( + return_value=[ + '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n' + ]) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + for e in w.stream(fake_api.get_namespaces, + _health_check_interval=30, + _request_timeout=60, + timeout_seconds=10): + pass + + # Verify the user's _request_timeout (60) was preserved, not overridden + fake_api.get_namespaces.assert_called_once_with( + _preload_content=False, watch=True, + timeout_seconds=10, _request_timeout=60) + + def test_health_check_reconnects_with_resource_version(self): + """Test that after a silent drop, the reconnect uses the last known + resource_version so no events are missed.""" + + # First response: yields events with resource versions, then times out + fake_resp_1 = Mock() + fake_resp_1.close = Mock() + fake_resp_1.release_conn = Mock() + + def stream_then_timeout(*args, **kwargs): + yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "100"}}}\n' + yield '{"type": "MODIFIED", "object": {"metadata": {"name": "test1", "resourceVersion": "101"}}}\n' + raise ReadTimeoutError(pool=None, url=None, message="Read timed out") + + fake_resp_1.stream = Mock(side_effect=stream_then_timeout) + + # Second response: yields more events + fake_resp_2 = Mock() + fake_resp_2.close = Mock() + fake_resp_2.release_conn = Mock() + fake_resp_2.stream = Mock( + return_value=[ + '{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "102"}}}\n' + ]) + + fake_api = Mock() + fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2]) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + events = [] + # Note: no timeout_seconds so retries are enabled + for e in w.stream(fake_api.get_namespaces, + _health_check_interval=5): + events.append(e) + if len(events) == 3: + w.stop() + + self.assertEqual(3, len(events)) + + # Verify the second call used the last resource_version from the + # first connection (101) so no events are missed + calls = fake_api.get_namespaces.call_args_list + self.assertEqual(2, len(calls)) + # First call: no resource_version + self.assertNotIn('resource_version', calls[0].kwargs) + # Second call: should have resource_version=101 + self.assertEqual('101', calls[1].kwargs['resource_version']) + + if __name__ == '__main__': unittest.main()