From 8f7b49008696e2bbbf0c373cf60bb528dcf9a4d9 Mon Sep 17 00:00:00 2001 From: Chen Li Date: Tue, 17 Oct 2017 01:19:34 -0500 Subject: [PATCH 1/2] Add flag to enable keep the watch action working all the time Fixes issue: https://github.com/kubernetes-incubator/client-python/issues/124 --- watch/watch.py | 27 +++++++++++++++++---------- watch/watch_test.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index 7e7e2cb7ef..f62a31b334 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -83,12 +83,14 @@ def unmarshal_event(self, data, return_type): js['object'] = self._api_client.deserialize(obj, return_type) return js - def stream(self, func, *args, **kwargs): + def stream(self, func, keep=False, *args, **kwargs): """Watch an API resource and stream the result back via a generator. :param func: The API function pointer. Any parameter to the function can be passed after this parameter. + :param keep: Flag to keep the watch work all the time. + :return: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. 'raw_object': a dict representing the watched object. @@ -113,12 +115,17 @@ def stream(self, func, *args, **kwargs): return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False - resp = func(*args, **kwargs) - try: - for line in iter_resp_lines(resp): - yield self.unmarshal_event(line, return_type) - if self._stop: - break - finally: - resp.close() - resp.release_conn() + + while True: + resp = func(*args, **kwargs) + try: + for line in iter_resp_lines(resp): + yield self.unmarshal_event(line, return_type) + if self._stop: + break + finally: + resp.close() + resp.release_conn() + + if not keep or self._stop: + break diff --git a/watch/watch_test.py b/watch/watch_test.py index 64b5835fe4..c314a43592 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -85,6 +85,36 @@ def test_watch_stream_twice(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_stream_keep(self): + w = Watch(float) + + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=['{"type": "ADDED", "object": 1}\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + count = 0 + for e in w.stream(fake_api.get_namespaces): + count = count + 1 + + self.assertEqual(count, 1) + + for e in w.stream(fake_api.get_namespaces, True): + count = count + 1 + if count == 2: + w.stop() + + self.assertEqual(count, 2) + self.assertEqual(fake_api.get_namespaces.call_count, 2) + self.assertEqual(fake_resp.read_chunked.call_count, 2) + self.assertEqual(fake_resp.close.call_count, 2) + self.assertEqual(fake_resp.release_conn.call_count, 2) + def test_unmarshal_with_float_object(self): w = Watch() event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float') From aec1c5259aa71f1476b397e804d6c396900ff606 Mon Sep 17 00:00:00 2001 From: Chen Li Date: Thu, 26 Oct 2017 03:15:50 -0500 Subject: [PATCH 2/2] Update continue the watch with resource_version --- watch/watch.py | 11 +++++++---- watch/watch_test.py | 10 ++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index f62a31b334..6f10e7b521 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -63,6 +63,7 @@ def __init__(self, return_type=None): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() + self.resource_version = 0 def stop(self): self._stop = True @@ -81,16 +82,16 @@ def unmarshal_event(self, data, return_type): if return_type: obj = SimpleNamespace(data=json.dumps(js['raw_object'])) js['object'] = self._api_client.deserialize(obj, return_type) + if hasattr(js['object'], 'metadata'): + self.resource_version = js['object'].metadata.resource_version return js - def stream(self, func, keep=False, *args, **kwargs): + def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. :param func: The API function pointer. Any parameter to the function can be passed after this parameter. - :param keep: Flag to keep the watch work all the time. - :return: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. 'raw_object': a dict representing the watched object. @@ -116,6 +117,7 @@ def stream(self, func, keep=False, *args, **kwargs): kwargs['watch'] = True kwargs['_preload_content'] = False + timeouts = ('timeout_seconds' in kwargs) while True: resp = func(*args, **kwargs) try: @@ -124,8 +126,9 @@ def stream(self, func, keep=False, *args, **kwargs): if self._stop: break finally: + kwargs['resource_version'] = self.resource_version resp.close() resp.release_conn() - if not keep or self._stop: + if timeouts or self._stop: break diff --git a/watch/watch_test.py b/watch/watch_test.py index c314a43592..73bcc9410f 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -85,7 +85,7 @@ def test_watch_stream_twice(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() - def test_watch_stream_keep(self): + def test_watch_stream_loop(self): w = Watch(float) fake_resp = Mock() @@ -99,12 +99,14 @@ def test_watch_stream_keep(self): fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' count = 0 - for e in w.stream(fake_api.get_namespaces): - count = count + 1 + # when timeout_seconds is set, auto-exist when timeout reaches + for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + count = count + 1 self.assertEqual(count, 1) - for e in w.stream(fake_api.get_namespaces, True): + # when no timeout_seconds, only exist when w.stop() is called + for e in w.stream(fake_api.get_namespaces): count = count + 1 if count == 2: w.stop()