Skip to content

Commit

Permalink
Merge pull request kubernetes-client#36 from lichen2013/keep_watch
Browse files Browse the repository at this point in the history
Keep the watch action working forever
  • Loading branch information
mbohlool committed Jan 10, 2018
2 parents 13aa7cd + aec1c52 commit 6f02e73
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
28 changes: 19 additions & 9 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, return_type=None):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0

def stop(self):
self._stop = True
Expand All @@ -81,6 +82,8 @@ def unmarshal_event(self, data, return_type):
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
return js

def stream(self, func, *args, **kwargs):
Expand Down Expand Up @@ -113,12 +116,19 @@ def stream(self, func, *args, **kwargs):
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
yield self.unmarshal_event(line, return_type)
if self._stop:
break
finally:
resp.close()
resp.release_conn()

timeouts = ('timeout_seconds' in kwargs)
while True:
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
yield self.unmarshal_event(line, return_type)
if self._stop:
break
finally:
kwargs['resource_version'] = self.resource_version
resp.close()
resp.release_conn()

if timeouts or self._stop:
break
32 changes: 32 additions & 0 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ def test_watch_stream_twice(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_stream_loop(self):
w = Watch(float)

fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'])

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

count = 0

# when timeout_seconds is set, auto-exist when timeout reaches
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
count = count + 1
self.assertEqual(count, 1)

# when no timeout_seconds, only exist when w.stop() is called
for e in w.stream(fake_api.get_namespaces):
count = count + 1
if count == 2:
w.stop()

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
self.assertEqual(fake_resp.read_chunked.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)

def test_unmarshal_with_float_object(self):
w = Watch()
event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')
Expand Down

0 comments on commit 6f02e73

Please sign in to comment.