Skip to content

Commit

Permalink
workflow_run_manager: workflow stop race condition
Browse files Browse the repository at this point in the history
* Happening when user stops a workflow before its first job is created.
  RWC stops only the existing jobs. However, because there is a grace
  period for stopping pods, RJC sidecar still runs, submitting a new
  job, and reporting its status, causing the workflow to “revive”.
  To mitigate this, we decrease the grace period to 0 and we don't
  allow stopped workflows to change status
  (closes reanahub/reana-client#395).
  • Loading branch information
Diego Rodriguez committed Apr 9, 2020
1 parent b08d617 commit 8495199
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 23 deletions.
5 changes: 3 additions & 2 deletions reana_workflow_controller/config.py
Expand Up @@ -11,7 +11,7 @@
import os

from packaging.version import parse
from reana_commons.config import SHARED_VOLUME_PATH
from reana_commons.config import REANA_COMPONENT_PREFIX, SHARED_VOLUME_PATH

from reana_workflow_controller.version import __version__

Expand Down Expand Up @@ -67,7 +67,8 @@
"""Common to all workflow engines environment variables."""

DEBUG_ENV_VARS = ({'name': 'WDB_SOCKET_SERVER',
'value': 'reana-wdb'},
'value': os.getenv('WDB_SOCKET_SERVER',
f'{REANA_COMPONENT_PREFIX}-wdb')},
{'name': 'WDB_NO_BROWSER_AUTO_OPEN',
'value': 'True'},
{'name': 'FLASK_ENV',
Expand Down
46 changes: 26 additions & 20 deletions reana_workflow_controller/consumer.py
Expand Up @@ -31,6 +31,7 @@
from reana_workflow_controller.config import (PROGRESS_STATUSES,
REANA_GITLAB_URL, REANA_URL)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.workflow_state import can_transition

try:
from urllib import parse as urlparse
Expand All @@ -56,30 +57,35 @@ def on_message(self, body, message):
body_dict = json.loads(body)
workflow_uuid = body_dict.get('workflow_uuid')
if workflow_uuid:
status = body_dict.get('status')
if status:
status = WorkflowStatus(status)
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\
.one_or_none()
next_status = body_dict.get('status')
if next_status:
next_status = WorkflowStatus(next_status)
print(" [x] Received workflow_uuid: {0} status: {1}".
format(workflow_uuid, status))
format(workflow_uuid, next_status))
logs = body_dict.get('logs') or ''
_update_workflow_status(workflow_uuid, status, logs)
if 'message' in body_dict and body_dict.get('message'):
msg = body_dict['message']
if 'progress' in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
# Caching: calculate input hash and store in JobCache
if 'caching_info' in msg:
_update_job_cache(msg)
Session.commit()


def _update_workflow_status(workflow_uuid, status, logs):
if can_transition(workflow, next_status):
_update_workflow_status(workflow, next_status, logs)
if 'message' in body_dict and body_dict.get('message'):
msg = body_dict['message']
if 'progress' in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
# Caching: calculate input hash and store in JobCache
if 'caching_info' in msg:
_update_job_cache(msg)
Session.commit()
else:
logging.error(f'Cannot transition workflow {workflow.id_}'
f' from status {workflow.status} to'
f' {next_status}.')


def _update_workflow_status(workflow, status, logs):
"""Update workflow status in DB."""
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\
.one_or_none()
if workflow.status != status:
Workflow.update_workflow_status(Session, workflow_uuid,
Workflow.update_workflow_status(Session, workflow.id_,
status, logs, None)
if workflow.git_ref:
_update_commit_status(workflow, status)
Expand Down
4 changes: 3 additions & 1 deletion reana_workflow_controller/workflow_run_manager.py
Expand Up @@ -318,7 +318,9 @@ def stop_batch_workflow_run(self):
current_k8s_batchv1_api_client.delete_namespaced_job(
job,
KubernetesWorkflowRunManager.default_namespace,
body=V1DeleteOptions(propagation_policy='Background'))
body=V1DeleteOptions(
grace_period_seconds=0,
propagation_policy='Background'))
except ApiException:
logging.error(f'Error while trying to stop {self.workflow.id_}'
f': Kubernetes job {job} could not be deleted.',
Expand Down
33 changes: 33 additions & 0 deletions reana_workflow_controller/workflow_state.py
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

from reana_db.models import WorkflowStatus


def can_transition(workflow, to_status):
"""Can a workflow transition?."""
transitions = [
# Creation
(WorkflowStatus.created, WorkflowStatus.deleted),
(WorkflowStatus.created, WorkflowStatus.running),
# Running
(WorkflowStatus.running, WorkflowStatus.failed),
(WorkflowStatus.running, WorkflowStatus.finished),
(WorkflowStatus.running, WorkflowStatus.stopped),
(WorkflowStatus.running, WorkflowStatus.running),
# Stopped
(WorkflowStatus.stopped, WorkflowStatus.deleted),
# Failed
(WorkflowStatus.failed, WorkflowStatus.deleted),
(WorkflowStatus.failed, WorkflowStatus.running),
# Finished
(WorkflowStatus.finished, WorkflowStatus.deleted),
(WorkflowStatus.finished, WorkflowStatus.running),
]
current_transition = (workflow.status, to_status)
return current_transition in transitions

0 comments on commit 8495199

Please sign in to comment.