Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import pydoc
import sys

from urllib3.exceptions import ProtocolError, ReadTimeoutError

from kubernetes import client

PYDOC_RETURN_LABEL = ":return:"
Expand Down Expand Up @@ -154,6 +156,14 @@ def stream(self, func, *args, **kwargs):
:param func: The API function pointer. Any parameter to the function
can be passed after this parameter.

:param int _health_check_interval: Optional. Number of seconds to wait
for data before assuming the connection has been silently dropped
(e.g., during a control plane upgrade). When set to a value > 0,
the watch will automatically detect silent connection drops and
reconnect using the last known resource_version. Default is 0
(disabled). This is useful for long-running watches that need to
survive cluster upgrades.

:return: Event object with these keys:
'type': The type of event such as "ADDED", "DELETED", etc.
'raw_object': a dict representing the watched object.
Expand All @@ -172,6 +182,11 @@ def stream(self, func, *args, **kwargs):
...
if should_stop:
watch.stop()

Example with health check (survives control plane upgrades):
for e in watch.stream(v1.list_namespace, _health_check_interval=30):
# If no data received for 30s, watch reconnects automatically
print(e['type'], e['object'].metadata.name)
"""

self._stop = False
Expand All @@ -187,6 +202,20 @@ def stream(self, func, *args, **kwargs):
disable_retries = ('timeout_seconds' in kwargs)
retry_after_410 = False
deserialize = kwargs.pop('deserialize', True)

# Health check interval: when > 0, sets a read timeout on the
# HTTP connection so that silent connection drops (e.g., during
# control plane upgrades) are detected and the watch reconnects.
# A value of 0 (default) disables this feature.
health_check_interval = kwargs.pop('_health_check_interval', 0)

# If health check is enabled and user hasn't set an explicit
# request timeout, use the health check interval as the read
# timeout. This causes urllib3 to raise ReadTimeoutError if no
# data arrives within the interval, which we catch below.
if health_check_interval > 0 and '_request_timeout' not in kwargs:
kwargs['_request_timeout'] = health_check_interval
Comment on lines +216 to +217

while True:
resp = func(*args, **kwargs)
try:
Expand Down Expand Up @@ -217,12 +246,20 @@ def stream(self, func, *args, **kwargs):
retry_after_410 = False
yield event
else:
if line:
if line:
yield line # Normal non-empty line
else:
yield '' # Only yield one empty line
else:
yield '' # Only yield one empty line
if self._stop:
break
except (ReadTimeoutError, ProtocolError) as e:
# If health check is enabled, treat a read timeout as a
# silent connection drop and allow the outer while loop
# to reconnect using the last known resource_version.
if health_check_interval > 0:
pass # Fall through to retry logic below
Comment on lines +256 to +260
else:
raise
Comment on lines +255 to +262
finally:
resp.close()
resp.release_conn()
Expand Down
198 changes: 198 additions & 0 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import unittest
from unittest.mock import Mock, call

from urllib3.exceptions import ReadTimeoutError

from kubernetes import client, config
from kubernetes.client import ApiException

Expand Down Expand Up @@ -626,5 +628,201 @@ def test_pod_log_empty_lines(self):
# ])


def test_health_check_detects_silent_drop_and_reconnects(self):
"""Test that _health_check_interval detects a silent connection drop
(simulated by ReadTimeoutError) and reconnects automatically,
continuing to yield events from the new connection."""

# First response: yields one event, then raises ReadTimeoutError
# (simulating a silent connection drop during control plane upgrade)
fake_resp_1 = Mock()
fake_resp_1.close = Mock()
fake_resp_1.release_conn = Mock()

def stream_then_timeout(*args, **kwargs):
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")

fake_resp_1.stream = Mock(side_effect=stream_then_timeout)

# Second response: yields another event (after reconnect)
fake_resp_2 = Mock()
fake_resp_2.close = Mock()
fake_resp_2.release_conn = Mock()
fake_resp_2.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n'
])

fake_api = Mock()
# First call returns the stream that will timeout,
# second call returns the stream with new events
fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2])
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
events = []
# Note: we do NOT pass timeout_seconds here because that disables
# retries. The watch should reconnect after the ReadTimeoutError
# and then stop naturally when the second stream ends (resource_version
# is set from the first event, so the finally block won't set _stop).
for e in w.stream(fake_api.get_namespaces,
_health_check_interval=5):
events.append(e)
# Stop after collecting events from both connections
if len(events) == 2:
w.stop()

# Should have received events from both connections
self.assertEqual(2, len(events))
self.assertEqual("test1", events[0]['object'].metadata.name)
self.assertEqual("test2", events[1]['object'].metadata.name)

# Verify the API was called twice (initial + reconnect)
self.assertEqual(2, fake_api.get_namespaces.call_count)

# Verify both responses were properly closed
fake_resp_1.close.assert_called_once()
fake_resp_1.release_conn.assert_called_once()
fake_resp_2.close.assert_called_once()
fake_resp_2.release_conn.assert_called_once()

def test_health_check_disabled_by_default(self):
"""Test that without _health_check_interval, a ReadTimeoutError
propagates to the caller (backward compatibility)."""

fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()

def stream_then_timeout(*args, **kwargs):
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")

fake_resp.stream = Mock(side_effect=stream_then_timeout)

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
events = []
with self.assertRaises(ReadTimeoutError):
for e in w.stream(fake_api.get_namespaces):
events.append(e)

# Should have received the one event before the timeout
self.assertEqual(1, len(events))
self.assertEqual("test1", events[0]['object'].metadata.name)

# Verify the response was properly closed even after exception
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_health_check_sets_request_timeout(self):
"""Test that _health_check_interval sets _request_timeout on the
API call when no explicit _request_timeout is provided."""

fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
])

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
for e in w.stream(fake_api.get_namespaces,
_health_check_interval=30,
timeout_seconds=10):
pass

# Verify _request_timeout was set to the health check interval
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True,
timeout_seconds=10, _request_timeout=30)

def test_health_check_preserves_explicit_request_timeout(self):
"""Test that _health_check_interval does NOT override an explicit
_request_timeout provided by the user."""

fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
])

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
for e in w.stream(fake_api.get_namespaces,
_health_check_interval=30,
_request_timeout=60,
timeout_seconds=10):
pass

# Verify the user's _request_timeout (60) was preserved, not overridden
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True,
timeout_seconds=10, _request_timeout=60)
Comment on lines +772 to +775

def test_health_check_reconnects_with_resource_version(self):
"""Test that after a silent drop, the reconnect uses the last known
resource_version so no events are missed."""

# First response: yields events with resource versions, then times out
fake_resp_1 = Mock()
fake_resp_1.close = Mock()
fake_resp_1.release_conn = Mock()

def stream_then_timeout(*args, **kwargs):
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "100"}}}\n'
yield '{"type": "MODIFIED", "object": {"metadata": {"name": "test1", "resourceVersion": "101"}}}\n'
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")

fake_resp_1.stream = Mock(side_effect=stream_then_timeout)

# Second response: yields more events
fake_resp_2 = Mock()
fake_resp_2.close = Mock()
fake_resp_2.release_conn = Mock()
fake_resp_2.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "102"}}}\n'
])

fake_api = Mock()
fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2])
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
events = []
# Note: no timeout_seconds so retries are enabled
for e in w.stream(fake_api.get_namespaces,
_health_check_interval=5):
events.append(e)
if len(events) == 3:
w.stop()

self.assertEqual(3, len(events))

# Verify the second call used the last resource_version from the
# first connection (101) so no events are missed
calls = fake_api.get_namespaces.call_args_list
self.assertEqual(2, len(calls))
# First call: no resource_version
self.assertNotIn('resource_version', calls[0].kwargs)
# Second call: should have resource_version=101
self.assertEqual('101', calls[1].kwargs['resource_version'])


if __name__ == '__main__':
unittest.main()
Loading