Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 80 additions & 52 deletions openshift/helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@


class KubernetesObjectHelper(object):

def __init__(self, api_version, kind, debug=False, reset_logfile=True, timeout=20, **auth):
self.api_version = api_version
self.kind = kind
Expand All @@ -70,6 +69,7 @@ def __init__(self, api_version, kind, debug=False, reset_logfile=True, timeout=2
self.base_model_name = self.get_base_model_name(self.model.__name__)
self.base_model_name_snake = self.get_base_model_name_snake(self.base_model_name)
self.timeout = timeout # number of seconds to wait for an API request
self.is_openshift = False

if debug:
self.enable_debug(reset_logfile)
Expand Down Expand Up @@ -108,6 +108,12 @@ def set_client_config(self, **auth):
else:
setattr(self.api_client.config, key, auth[key])

try:
client.OapiApi(api_client=self.api_client).get_api_resources()
self.is_openshift = True
except (ApiException,MaxRetryError):
pass

@staticmethod
def enable_debug(reset_logfile=True):
""" Turn on debugging. If reset_logfile, then remove the existing log file. """
Expand All @@ -129,7 +135,7 @@ def get_object(self, name, namespace=None):
k8s_obj = get_method(name, namespace)
except ApiException as exc:
if exc.status != 404:
if self.base_model_name == 'Project'and exc.status == 403:
if self.base_model_name == 'Project' and exc.status == 403:
pass
else:
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
Expand All @@ -153,7 +159,10 @@ def patch_object(self, name, namespace, k8s_obj):
except ApiException as exc:
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
raise OpenShiftException(msg, status=exc.status)
return_obj = self.__read_stream(w, stream, name)
except MaxRetryError as ex:
raise OpenShiftException(str(ex.reason))

return_obj = self.__read_stream(w, stream, name, 'patch')
if not return_obj:
return_obj = self.__wait_for_response(name, namespace, 'patch')
return return_obj
Expand All @@ -175,7 +184,7 @@ def create_project(self, metadata, display_name=None, description=None):
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
raise OpenShiftException(msg, status=exc.status)

return_obj = self.__read_stream(w, stream, metadata.name)
return_obj = self.__read_stream(w, stream, metadata.name, 'create')
if not return_obj:
return_obj = self.__wait_for_response(metadata.name, None, 'create')

Expand Down Expand Up @@ -212,11 +221,7 @@ def create_object(self, namespace, k8s_obj=None, body=None):
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
raise OpenShiftException(msg, status=exc.status)

return_obj = self.__read_stream(w, stream, name)

# Allow OpenShift annotations to be added to Namespace
#if isinstance(k8s_obj, client.models.V1Namespace):
# time.sleep(1)
return_obj = self.__read_stream(w, stream, name, 'create')

if not return_obj:
return_obj = self.__wait_for_response(name, namespace, 'create')
Expand Down Expand Up @@ -257,8 +262,7 @@ def delete_object(self, name, namespace):
msg += ' status: {}'.format(status_obj)
raise OpenShiftException(msg)

if self.kind in ('project', 'namespace'):
self.__read_stream(w, stream, name)
self.__wait_for_response(name, namespace, 'delete')

def replace_object(self, name, namespace, k8s_obj=None, body=None):
""" Replace an existing object. Pass in a model object or request dict().
Expand Down Expand Up @@ -294,7 +298,7 @@ def replace_object(self, name, namespace, k8s_obj=None, body=None):
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
raise OpenShiftException(msg, status=exc.status)

return_obj = self.__read_stream(w, stream, name)
return_obj = self.__read_stream(w, stream, name, 'replace')
if not return_obj:
return_obj = self.__wait_for_response(name, namespace, 'replace')

Expand Down Expand Up @@ -476,53 +480,58 @@ def __create_stream(self, namespace):
stream = w.stream(list_method, _request_timeout=self.timeout)
return w, stream

def __read_stream(self, watcher, stream, name):
#TODO https://cobe.io/blog/posts/kubernetes-watch-python/ <--- might help?
def __read_stream(self, watcher, stream, name, action):
# TODO https://cobe.io/blog/posts/kubernetes-watch-python/ <--- might help?

return_obj = None

try:
for event in stream:
obj = None
if event.get('object'):
obj_json = json.dumps(event['object'].to_dict())
logger.debug(
"EVENT type: {0} object: {1}".format(event['type'], obj_json)
)
obj = event['object']
if action == 'delete':
event_types = ['DELETED']
else:
logger.debug(repr(event))
event_types = ['ADDED', 'MODIFIED']

if event['object'].metadata.name == name:
if event['type'] == 'DELETED':
# Object was deleted
return_obj = obj
watcher.stop()
break
elif obj is not None:
# Object is either added or modified. Check the status and determine if we
# should continue waiting
if hasattr(obj, 'status'):
status = getattr(obj, 'status')
if hasattr(status, 'phase'):
if status.phase == 'Active':
# TODO other phase values ??
# TODO test namespaces for OpenShift annotations if needed
return_obj = obj
watcher.stop()
break
elif hasattr(status, 'conditions'):
conditions = getattr(status, 'conditions')
if conditions and len(conditions) > 0:
# We know there is a status, but it's up to the user to determine meaning.
return_obj = obj
watcher.stop()
break
elif obj.kind == 'Service' and status is not None:
return_obj = obj
watcher.stop()
break
elif obj.kind == 'Route':
obj = None
if event.get('object'):
obj_json = json.dumps(event['object'].to_dict())
logger.debug(
"EVENT type: {0} object: {1}".format(event['type'], obj_json)
)
obj = event['object']
else:
logger.debug(repr(event))

if event['type'] in event_types:
if event['type'] == 'DELETED':
# Object was deleted
return_obj = obj
watcher.stop()
break
else:
# TODO: better handle modified events to ensure we are returning the right one
# Object is either added or modified. Check the status and determine if we
# should continue waiting
status = getattr(obj, 'status', None)

# if self.kind == 'namespace':
# if self.is_openshift:
# try:
# annotation_keys = obj.metadata.annotations.keys()
# required_annotations = [u'openshift.io/sa.scc.mcs',
# u'openshift.io/sa.scc.supplemental-groups',
# u'openshift.io/sa.scc.uid-range']
# for key in required_annotations:
# if key not in annotation_keys:
# continue
# except AttributeError:
# continue
# if status.phase == 'Active':
# return_obj = obj
# watcher.stop()
# break
if self.kind == 'route':
route_statuses = set()
for route_ingress in status.ingress:
for condition in route_ingress.conditions:
Expand All @@ -531,7 +540,26 @@ def __read_stream(self, watcher, stream, name):
return_obj = obj
watcher.stop()
break

elif self.kind in ['service', 'deployment_config']:
return_obj = obj
watcher.stop()
break
else:
if hasattr(status, 'phase'):
if status.phase == 'Active':
# TODO other phase values ??
# TODO test namespaces for OpenShift annotations if needed
return_obj = obj
watcher.stop()
break
elif hasattr(status, 'conditions'):
# TODO: attempt to handle generic conditions better
conditions = getattr(status, 'conditions')
if conditions and len(conditions) > 0:
# We know there is a status, but it's up to the user to determine meaning.
return_obj = obj
watcher.stop()
break
except Exception as exc:
# A timeout occurred
logger.debug('STREAM FAILED: {}'.format(exc))
Expand Down
4 changes: 2 additions & 2 deletions test/functional/test_v1_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def k8s_namespace(admin_ansible_helper, create_params):
try:
admin_ansible_helper.delete_object(name, None)
except OpenShiftException as ex:
# Swallow exception if object is already removed
if ex.value.get('status') != 404:
# Swallow exception if object is already removed or being removed
if ex.value.get('status') not in [404, 409]:
raise


Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ ignore = E123,E124,E127,E128,E201,E202,E203,E211,E221,E222,E225,E226,E228,E227,E
# Matches line length set at github.com/ansible/ansible/tox.ini
max-line-length = 160
# Not worrying about style in the following
exclude = .tox/*,test/*,openshift/test/*,openshift/client/models/*,openshift/client/apis/*,openshift/client/__init__.py,openshift/__init__.py
exclude = .tox/*,test/*,openshift/test/*,openshift/client/models/*,openshift/client/apis/*,openshift/client/__init__.py,openshift/__init__.py,venv/*,_modules/*