From 1cb66c26c3b30410fb236feb5bfca24501d544fc Mon Sep 17 00:00:00 2001 From: Diego Rodriguez Date: Tue, 7 Apr 2020 10:42:37 +0200 Subject: [PATCH] workflow_run_manager: workflow stop race condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Happening when user stops a workflow before its first job is created. RWC stops only the existing jobs. However, because there is a grace period for stopping pods, RJC sidecar still runs, submitting a new job, and reporting its status, causing the workflow to “revive”. To mitigate this, we decrease the grace period to 0 and we don't allow stopped workflows to change status (closes reanahub/reana-client#395). --- reana_workflow_controller/config.py | 5 +- reana_workflow_controller/consumer.py | 46 +++++++++++-------- .../workflow_run_manager.py | 4 +- reana_workflow_controller/workflow_state.py | 33 +++++++++++++ 4 files changed, 65 insertions(+), 23 deletions(-) create mode 100644 reana_workflow_controller/workflow_state.py diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 33c88b47..128de77e 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -11,7 +11,7 @@ import os from packaging.version import parse -from reana_commons.config import SHARED_VOLUME_PATH +from reana_commons.config import REANA_COMPONENT_PREFIX, SHARED_VOLUME_PATH from reana_workflow_controller.version import __version__ @@ -67,7 +67,8 @@ """Common to all workflow engines environment variables.""" DEBUG_ENV_VARS = ({'name': 'WDB_SOCKET_SERVER', - 'value': 'reana-wdb'}, + 'value': os.getenv('WDB_SOCKET_SERVER', + f'{REANA_COMPONENT_PREFIX}-wdb')}, {'name': 'WDB_NO_BROWSER_AUTO_OPEN', 'value': 'True'}, {'name': 'FLASK_ENV', diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index e9be565e..afbcad0c 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -31,6 +31,7 @@ from reana_workflow_controller.config import (PROGRESS_STATUSES, REANA_GITLAB_URL, REANA_URL) from reana_workflow_controller.errors import REANAWorkflowControllerError +from reana_workflow_controller.workflow_state import can_transition try: from urllib import parse as urlparse @@ -56,30 +57,35 @@ def on_message(self, body, message): body_dict = json.loads(body) workflow_uuid = body_dict.get('workflow_uuid') if workflow_uuid: - status = body_dict.get('status') - if status: - status = WorkflowStatus(status) + workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\ + .one_or_none() + next_status = body_dict.get('status') + if next_status: + next_status = WorkflowStatus(next_status) print(" [x] Received workflow_uuid: {0} status: {1}". - format(workflow_uuid, status)) + format(workflow_uuid, next_status)) logs = body_dict.get('logs') or '' - _update_workflow_status(workflow_uuid, status, logs) - if 'message' in body_dict and body_dict.get('message'): - msg = body_dict['message'] - if 'progress' in msg: - _update_run_progress(workflow_uuid, msg) - _update_job_progress(workflow_uuid, msg) - # Caching: calculate input hash and store in JobCache - if 'caching_info' in msg: - _update_job_cache(msg) - Session.commit() - - -def _update_workflow_status(workflow_uuid, status, logs): + if can_transition(workflow, next_status): + _update_workflow_status(workflow, next_status, logs) + if 'message' in body_dict and body_dict.get('message'): + msg = body_dict['message'] + if 'progress' in msg: + _update_run_progress(workflow_uuid, msg) + _update_job_progress(workflow_uuid, msg) + # Caching: calculate input hash and store in JobCache + if 'caching_info' in msg: + _update_job_cache(msg) + Session.commit() + else: + logging.error(f'Cannot transition workflow {workflow.id_}' + f' from status {workflow.status} to' + f' {next_status}.') + + +def _update_workflow_status(workflow, status, logs): """Update workflow status in DB.""" - workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\ - .one_or_none() if workflow.status != status: - Workflow.update_workflow_status(Session, workflow_uuid, + Workflow.update_workflow_status(Session, workflow.id_, status, logs, None) if workflow.git_ref: _update_commit_status(workflow, status) diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 5e0be765..1e53c4f2 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -337,7 +337,9 @@ def stop_batch_workflow_run(self): current_k8s_batchv1_api_client.delete_namespaced_job( job, KubernetesWorkflowRunManager.default_namespace, - body=V1DeleteOptions(propagation_policy='Background')) + body=V1DeleteOptions( + grace_period_seconds=0, + propagation_policy='Background')) except ApiException: logging.error(f'Error while trying to stop {self.workflow.id_}' f': Kubernetes job {job} could not be deleted.', diff --git a/reana_workflow_controller/workflow_state.py b/reana_workflow_controller/workflow_state.py new file mode 100644 index 00000000..33051366 --- /dev/null +++ b/reana_workflow_controller/workflow_state.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2020 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +from reana_db.models import WorkflowStatus + + +def can_transition(workflow, to_status): + """Can a workflow transition?.""" + transitions = [ + # Creation + (WorkflowStatus.created, WorkflowStatus.deleted), + (WorkflowStatus.created, WorkflowStatus.running), + # Running + (WorkflowStatus.running, WorkflowStatus.failed), + (WorkflowStatus.running, WorkflowStatus.finished), + (WorkflowStatus.running, WorkflowStatus.stopped), + (WorkflowStatus.running, WorkflowStatus.running), + # Stopped + (WorkflowStatus.stopped, WorkflowStatus.deleted), + # Failed + (WorkflowStatus.failed, WorkflowStatus.deleted), + (WorkflowStatus.failed, WorkflowStatus.running), + # Finished + (WorkflowStatus.finished, WorkflowStatus.deleted), + (WorkflowStatus.finished, WorkflowStatus.running), + ] + current_transition = (workflow.status, to_status) + return current_transition in transitions