Skip to content

Commit

Permalink
Merge 0c227dc into 0821a8d
Browse files Browse the repository at this point in the history
  • Loading branch information
Behoston committed Jun 27, 2019
2 parents 0821a8d + 0c227dc commit 8cff7b1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
14 changes: 14 additions & 0 deletions kubepy/api.py
Expand Up @@ -22,6 +22,20 @@ def get(kind, name=None):
return objects


def get_failed_pod_for_job(job_name):
command = [
'kubectl', 'get', 'pod',
'-o', 'yaml',
'--field-selector', 'status.phase=Failed',
'-l', 'job-name={}'.format(job_name),
]
get_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=sys.stderr)
objects = yaml.safe_load(get_process.stdout)
if get_process.wait() != 0:
raise ApiError
return objects


def logs(pod_name, container_name=None):
command = ['kubectl', 'logs', pod_name]
if container_name:
Expand Down
25 changes: 25 additions & 0 deletions kubepy/appliers.py
@@ -1,11 +1,14 @@
import collections
import copy
import logging
import time

from kubepy import api
from kubepy import definition_manager
from kubepy import definition_transformers

logger = logging.getLogger(__name__)


class InstallError(Exception):
pass
Expand Down Expand Up @@ -174,8 +177,21 @@ def raise_if_failed(self):
self._raise_for_failed_condition(condition)
failures = self.status.get('failed', 0)
if self.max_retries and failures > self.max_retries:
failed_pod = self.get_job_pod()
if failed_pod:
stdout = self.get_failed_pod_logs(failed_pod)
print(stdout)
raise JobError('Job failed {} times.'.format(failures))

def get_failed_pod_logs(self, failed_pod):
for init_container_status in failed_pod['status'].get('initContainerStatuses', []):
if init_container_status['state'].get('terminated', {}).get('reason') == 'ContainerCannotRun':
return 'Init container failed! ' + str(init_container_status['state']['terminated']['message'])
for container_status in failed_pod['status'].get('containerStatuses', []):
if container_status['state'].get('terminated', {}).get('reason') == 'ContainerCannotRun':
return 'Container failed! ' + str(container_status['state']['terminated']['message'])
return api.logs(failed_pod['metadata']['name'])[0].decode()

def _raise_for_failed_condition(self, condition):
if condition['reason'] == 'DeadlineExceeded':
raise DeadlineExceeded(condition['message'])
Expand All @@ -186,6 +202,15 @@ def _raise_for_failed_condition(self, condition):
def succeeded(self):
return 'completionTime' in self.status

def get_job_pod(self):
pods = api.get_failed_pod_for_job(self.definition_name)['items']
if not pods:
logging.warning('No pod found for job {}!'.format(self.definition_name))
return
elif len(pods) > 1:
logging.info('More than one pod found for job {}, returning first.'.format(self.definition_name))
return pods[0]


ContainerInfo = collections.namedtuple('ContainerInfo', ['name', 'state'])

Expand Down

0 comments on commit 8cff7b1

Please sign in to comment.