Skip to content

Commit

Permalink
consumer: saves workflow engine container logs
Browse files Browse the repository at this point in the history
* Saves workflow engine container logs and updates WF status if it
  is only needed.
  • Loading branch information
Rokas Maciulaitis committed Feb 5, 2020
1 parent 4b77b33 commit 88ea799
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ before_install:
- pip install -r requirements-dev.txt
- travis_retry pip install --upgrade pip setuptools py
- travis_retry pip install twine wheel coveralls
- travis_retry pip install "mock>=3"
- travis_retry pip install "mock>=3,<4"

install:
- travis_retry pip install -e .[all]
Expand Down
40 changes: 26 additions & 14 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from __future__ import absolute_import

import json
import logging
import uuid
from datetime import datetime

import requests
from kubernetes.client.rest import ApiException
from reana_commons.consumer import BaseConsumer
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client,
current_k8s_corev1_api_client)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (calculate_file_access_time,
calculate_hash_of_dir,
Expand Down Expand Up @@ -74,17 +76,19 @@ def on_message(self, body, message):

def _update_workflow_status(workflow_uuid, status, logs):
"""Update workflow status in DB."""
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\
.one_or_none()
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued]
if status not in alive_statuses:
workflow.run_finished_at = datetime.now()
_delete_workflow_engine_pod(workflow_uuid)
if workflow.status != status:
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running,
WorkflowStatus.queued]
if status not in alive_statuses:
workflow.run_finished_at = datetime.now()
_delete_workflow_engine_pod(workflow)


def _update_commit_status(workflow, status):
Expand Down Expand Up @@ -191,19 +195,27 @@ def _update_job_cache(msg):
Session.add(cached_job)


def _delete_workflow_engine_pod(workflow_uuid):
def _delete_workflow_engine_pod(workflow):
"""Delete workflow engine pod."""
try:
jobs = current_k8s_batchv1_api_client.list_namespaced_job(
jobs = current_k8s_corev1_api_client.list_namespaced_pod(
namespace='default',
)
for job in jobs.items:
if workflow_uuid in job.metadata.name:
if str(workflow.id_) in job.metadata.name:
workflow.logs = \
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=job.metadata.namespace,
name=job.metadata.name,
container='workflow-engine')
current_k8s_batchv1_api_client.delete_namespaced_job(
namespace='default',
propagation_policy="Background",
name=job.metadata.name)
name=job.metadata.labels['job-name'])
break
except ApiException as e:
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be deleted {}.".format(e))
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
'marshmallow>2.13.0,<=2.20.1',
'packaging>=18.0',
'reana-commons[kubernetes]>=0.7.0.dev20200203,<0.8.0',
'reana-db>=0.7.0.dev20200129,<0.8.0',
'reana-db>=0.7.0.dev20200206,<0.8.0',
'requests==2.20.0',
'sqlalchemy-utils>=0.31.0',
'uwsgi-tools>=1.1.1',
Expand Down

0 comments on commit 88ea799

Please sign in to comment.