Skip to content

Commit

Permalink
job_monitor: fixes job pod deletion and logs capturing
Browse files Browse the repository at this point in the history
* Closes #232
  • Loading branch information
Rokas Maciulaitis committed Feb 24, 2020
1 parent aa1d036 commit ca7c6f4
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,32 @@ def __init__(self, app=None):
thread_name='kubernetes_job_monitor'
)

def get_container_logs(self, last_spawned_pod):
def get_container_logs(self, job_id):
"""Get job pod's containers' logs."""
try:
pod_logs = ''
pod = current_k8s_corev1_api_client.read_namespaced_pod(
namespace=last_spawned_pod.metadata.namespace,
name=last_spawned_pod.metadata.name)
namespace='default',
name=job_id)
containers = pod.spec.init_containers + pod.spec.containers \
if pod.spec.init_containers else pod.spec.containers
for container in containers:
container_log = \
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=last_spawned_pod.metadata.namespace,
name=last_spawned_pod.metadata.name,
namespace='default',
name=job_id,
container=container.name)
pod_logs += '{}: \n {} \n'.format(
container.name, container_log)
return pod_logs
except client.rest.ApiException as e:
logging.error(
"Error while connecting to Kubernetes API: {}".format(e))
return None
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
return None

def watch_jobs(self, job_db, app=None):
"""Open stream connection to k8s apiserver to watch all jobs status.
Expand Down Expand Up @@ -111,6 +113,7 @@ def watch_jobs(self, job_db, app=None):
continue
job_id = remaining_jobs[job.metadata.labels['job-name']]
kubernetes_job_id = job.metadata.labels['job-name']
kubernetes_pod_job_id = job.metadata.name
if job.status.phase == 'Succeeded':
logging.info(
'Job job_id: {}, kubernetes_job_id: {}'
Expand Down Expand Up @@ -151,7 +154,7 @@ def watch_jobs(self, job_db, app=None):
logging.info('Grabbing pod {} logs...'.format(
job_id))
job_db[job_id]['log'] = \
self.get_container_logs(job_id) or \
self.get_container_logs(kubernetes_pod_job_id) or \
job.status.container_statuses[0].state \
.waiting.message
store_logs(job_id=job_id, logs=job_db[job_id]['log'])
Expand Down

0 comments on commit ca7c6f4

Please sign in to comment.