diff --git a/.travis.yml b/.travis.yml index 248c050d..272759ee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ before_install: - pip install -r requirements-dev.txt - travis_retry pip install --upgrade pip setuptools py - travis_retry pip install twine wheel coveralls - - travis_retry pip install "mock>=3" + - travis_retry pip install "mock>=3,<4" install: - travis_retry pip install -e .[all] diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 60082280..b335397c 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -11,13 +11,15 @@ from __future__ import absolute_import import json +import logging import uuid from datetime import datetime import requests from kubernetes.client.rest import ApiException from reana_commons.consumer import BaseConsumer -from reana_commons.k8s.api_client import current_k8s_batchv1_api_client +from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client, + current_k8s_corev1_api_client) from reana_commons.k8s.secrets import REANAUserSecretsStore from reana_commons.utils import (calculate_file_access_time, calculate_hash_of_dir, @@ -74,17 +76,19 @@ def on_message(self, body, message): def _update_workflow_status(workflow_uuid, status, logs): """Update workflow status in DB.""" - Workflow.update_workflow_status(Session, workflow_uuid, - status, logs, None) workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\ .one_or_none() - if workflow.git_ref: - _update_commit_status(workflow, status) - alive_statuses = \ - [WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued] - if status not in alive_statuses: - workflow.run_finished_at = datetime.now() - _delete_workflow_engine_pod(workflow_uuid) + if workflow.status != status: + Workflow.update_workflow_status(Session, workflow_uuid, + status, logs, None) + if workflow.git_ref: + _update_commit_status(workflow, status) + alive_statuses = \ + [WorkflowStatus.created, WorkflowStatus.running, + WorkflowStatus.queued] + if status not in alive_statuses: + workflow.run_finished_at = datetime.now() + _delete_workflow_engine_pod(workflow) def _update_commit_status(workflow, status): @@ -191,19 +195,27 @@ def _update_job_cache(msg): Session.add(cached_job) -def _delete_workflow_engine_pod(workflow_uuid): +def _delete_workflow_engine_pod(workflow): """Delete workflow engine pod.""" try: - jobs = current_k8s_batchv1_api_client.list_namespaced_job( + jobs = current_k8s_corev1_api_client.list_namespaced_pod( namespace='default', ) for job in jobs.items: - if workflow_uuid in job.metadata.name: + if str(workflow.id_) in job.metadata.name: + workflow.logs = \ + current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=job.metadata.namespace, + name=job.metadata.name, + container='workflow-engine') current_k8s_batchv1_api_client.delete_namespaced_job( namespace='default', propagation_policy="Background", - name=job.metadata.name) + name=job.metadata.labels['job-name']) break except ApiException as e: raise REANAWorkflowControllerError( "Workflow engine pod cound not be deleted {}.".format(e)) + except Exception as e: + logging.error(traceback.format_exc()) + logging.error("Unexpected error: {}".format(e)) diff --git a/setup.py b/setup.py index faef4a9f..dedb322c 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ 'marshmallow>2.13.0,<=2.20.1', 'packaging>=18.0', 'reana-commons[kubernetes]>=0.7.0.dev20200203,<0.8.0', - 'reana-db>=0.7.0.dev20200129,<0.8.0', + 'reana-db>=0.7.0.dev20200206,<0.8.0', 'requests==2.20.0', 'sqlalchemy-utils>=0.31.0', 'uwsgi-tools>=1.1.1',