Skip to content

Commit

Permalink
Merge 0468a5d into 0821a8d
Browse files Browse the repository at this point in the history
  • Loading branch information
Behoston committed May 27, 2019
2 parents 0821a8d + 0468a5d commit ec58118
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
14 changes: 14 additions & 0 deletions kubepy/api.py
Expand Up @@ -22,6 +22,20 @@ def get(kind, name=None):
return objects


def get_pod_for_job(job_name):
command = [
'kubectl', 'get', 'pod',
'-o', 'yaml',
'--field-selector', 'status.phase=Failed',
'-l', 'job-name={}'.format(job_name),
]
get_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=sys.stderr)
objects = yaml.safe_load(get_process.stdout)
if get_process.wait() != 0:
raise ApiError
return objects


def logs(pod_name, container_name=None):
command = ['kubectl', 'logs', pod_name]
if container_name:
Expand Down
35 changes: 30 additions & 5 deletions kubepy/appliers.py
@@ -1,28 +1,31 @@
import collections
import copy
import logging
import time

from kubepy import api
from kubepy import definition_manager
from kubepy import definition_transformers

logger = logging.getLogger(__name__)


class InstallError(Exception):
pass


class JobError(InstallError):
pass


class PodError(JobError):
def __init__(self, message='', container_name='', stdout='', stderr=''):
self.container_name = container_name
self.stdout = stdout
self.stderr = stderr
super().__init__(message, container_name, stdout, stderr)


class PodError(JobError):
pass


class DeadlineExceeded(JobError):
pass

Expand Down Expand Up @@ -174,7 +177,20 @@ def raise_if_failed(self):
self._raise_for_failed_condition(condition)
failures = self.status.get('failed', 0)
if self.max_retries and failures > self.max_retries:
raise JobError('Job failed {} times.'.format(failures))
failed_pod = self.get_job_pod()
if failed_pod:
stdout, stderr = self.get_failed_pod_logs(failed_pod)
else:
stdout = ''
stderr = ''
raise JobError('Job failed {} times.'.format(failures), stderr=stderr, stdout=stdout)

def get_failed_pod_logs(self, failed_pod):
for init_container_status in failed_pod['status'].get('initContainerStatuses', []):
if init_container_status['state'].get('terminated', {}).get('reason') == 'ContainerCannotRun':
return 'Init container failed!', init_container_status['state']['terminated']['message']
print(failed_pod['metadata']['name'])
return api.logs(failed_pod['metadata']['name'])

def _raise_for_failed_condition(self, condition):
if condition['reason'] == 'DeadlineExceeded':
Expand All @@ -186,6 +202,15 @@ def _raise_for_failed_condition(self, condition):
def succeeded(self):
return 'completionTime' in self.status

def get_job_pod(self):
pods = api.get_pod_for_job(self.definition_name)['items']
if not pods:
logging.warning('No pod found for job {}!'.format(self.definition_name))
return
elif len(pods) > 1:
logging.info('More than one pod found for job {}, returning first.'.format(self.definition_name))
return pods[0]


ContainerInfo = collections.namedtuple('ContainerInfo', ['name', 'state'])

Expand Down

0 comments on commit ec58118

Please sign in to comment.