Skip to content

Commit

Permalink
Merge cb606d5 into 67c6cda
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego committed Jan 21, 2020
2 parents 67c6cda + cb606d5 commit ce76002
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ services:

before_install:
- pip install -r requirements-dev.txt
- travis_retry pip install --upgrade pip setuptools py
- travis_retry pip install --upgrade pip==19.3.1 setuptools py
- travis_retry pip install twine wheel coveralls
- travis_retry pip install "mock>=3"

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apt-get update && \
apt-get install -y \
gcc \
vim-tiny && \
pip install --upgrade pip
pip install --upgrade pip==19.3.1

RUN apt-get update && \
apt-get upgrade -y && \
Expand Down
10 changes: 9 additions & 1 deletion docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,14 @@
"message": "Status resume is not supported yet."
}
}
},
"502": {
"description": "Request failed. Connection to a third party system has failed.",
"examples": {
"application/json": {
"message": "Connection to database timed out, please retry."
}
}
}
},
"summary": "Set workflow status."
Expand Down Expand Up @@ -1231,4 +1239,4 @@
},
"swagger": "2.0",
"tags": []
}
}
4 changes: 4 additions & 0 deletions reana_workflow_controller/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ class REANAWorkflowDeletionError(Exception):

class REANAInteractiveSessionError(Exception):
"""Error when trying to create an interactive session."""


class REANAExternalCallError(Exception):
"""Error when trying to create an interactive session."""
41 changes: 32 additions & 9 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,61 @@
from reana_commons.utils import get_workflow_status_change_verb
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from sqlalchemy.exc import SQLAlchemyError
from kubernetes.client.rest import ApiException

from reana_workflow_controller.config import (REANA_GITLAB_HOST,
WORKFLOW_TIME_FORMAT)
from reana_workflow_controller.errors import (REANAWorkflowControllerError,
from reana_workflow_controller.errors import (REANAExternalCallError,
REANAWorkflowControllerError,
REANAWorkflowDeletionError)
from reana_workflow_controller.workflow_run_manager import \
KubernetesWorkflowRunManager


def start_workflow(workflow, parameters):
"""Start a workflow."""
if workflow.status in [WorkflowStatus.created, WorkflowStatus.queued]:
current_db_sessions = Session.object_session(workflow)
kwrm = KubernetesWorkflowRunManager(workflow)

def _start_workflow_db(workflow, parameters):
workflow.run_started_at = datetime.now()
workflow.status = WorkflowStatus.running
if parameters:
workflow.input_parameters = parameters.get('input_parameters')
workflow.operational_options = \
parameters.get('operational_options')
current_db_sessions = Session.object_session(workflow)
current_db_sessions.add(workflow)
current_db_sessions.commit()
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.start_batch_workflow_run()
else:

if workflow.status not in [WorkflowStatus.created, WorkflowStatus.queued]:
message = \
("Workflow {id_} could not be started because it {verb}"
" already {status}.").format(
id_=workflow.id_,
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name))
id_=workflow.id_,
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name))
raise REANAWorkflowControllerError(message)

try:
kwrm.start_batch_workflow_run()
_start_workflow_db(workflow, parameters)
except SQLAlchemyError as e:
message = \
"Database connection failed, please retry."
logging.error(f"Error while creating {workflow.id_}: {message}\n{e}",
exc_info=True)
logging.error(f"Stopping Kubernetes job for {workflow.id_} ...")
# Roll back creation of batch workflow job in Kubernetes
kwrm.stop_batch_workflow_run()
raise REANAExternalCallError(message)
except ApiException as e:
message = \
"Kubernetes connection failed, please retry."
logging.error(f"Error while creating {workflow.id_}: {message}\n{e}",
exc_info=True)
raise REANAExternalCallError(message)


def stop_workflow(workflow):
"""Stop a given workflow."""
Expand Down
13 changes: 12 additions & 1 deletion reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from reana_db.utils import _get_workflow_with_uuid_or_name

from reana_workflow_controller.config import WORKFLOW_TIME_FORMAT
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.errors import (REANAExternalCallError,
REANAWorkflowControllerError)
from reana_workflow_controller.rest.utils import (build_workflow_logs,
delete_workflow,
get_current_job_progress,
Expand Down Expand Up @@ -425,6 +426,14 @@ def set_workflow_status(workflow_id_or_name): # noqa
{
"message": "Status resume is not supported yet."
}
502:
description: >-
Request failed. Connection to a third party system has failed.
examples:
application/json:
{
"message": "Connection to database timed out, please retry."
}
"""

try:
Expand Down Expand Up @@ -478,5 +487,7 @@ def set_workflow_status(workflow_id_or_name): # noqa
return jsonify({"message": str(e)}), 400
except NotImplementedError as e:
return jsonify({"message": str(e)}), 501
except REANAExternalCallError as e:
return jsonify({"message": str(e)}), 502
except Exception as e:
return jsonify({"message": str(e)}), 500
2 changes: 1 addition & 1 deletion reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def start_batch_workflow_run(self):
msg = 'Workflow engine/job controller pod ' \
'creation failed {}'.format(e)
logging.error(msg, exc_info=True)
raise REANAWorkflowControllerError(e)
raise e

def start_interactive_session(self, interactive_session_type, **kwargs):
"""Start an interactive workflow run.
Expand Down
55 changes: 55 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,61 @@ def test_start_input_parameters(app, session, default_user, user_secrets,
parameters['input_parameters']


def test_start_workflow_db_failure(app, session, default_user, user_secrets,
corev1_api_client_with_user_secrets,
sample_serial_workflow_in_db):
"""Test starting workflow with a DB failure."""
mock_session_cls = mock.Mock()
mock_session = mock.Mock()
mock_session_cls.object_session.return_value = mock_session
from sqlalchemy.exc import SQLAlchemyError
mock_session.commit = mock.Mock(
side_effect=SQLAlchemyError('Could not connect to the server.'))
mock_k8s_run_manager_cls = mock.Mock()
k8s_workflow_run_manager = mock.Mock()
mock_k8s_run_manager_cls.return_value = k8s_workflow_run_manager
with mock.patch.multiple(
'reana_workflow_controller.rest.utils',
Session=mock_session_cls,
KubernetesWorkflowRunManager=mock_k8s_run_manager_cls):
with app.test_client() as client:
res = client.put(
url_for('statuses.set_workflow_status',
workflow_id_or_name=sample_serial_workflow_in_db.id_),
query_string={"user": default_user.id_,
"status": "start"},
content_type='application/json',
data=json.dumps({}))
assert res.status_code == 502
# assert that Kubernetes job creation failes
k8s_workflow_run_manager.stop_batch_workflow_run.assert_called_once()


def test_start_workflow_kubernetes_failure(
app, session, default_user, user_secrets,
corev1_api_client_with_user_secrets, sample_serial_workflow_in_db):
"""Test starting workflow with a Kubernetes failure when creating jobs."""
mock_k8s_run_manager_cls = mock.Mock()
k8s_workflow_run_manager = mock.Mock()
from kubernetes.client.rest import ApiException
k8s_workflow_run_manager.start_batch_workflow_run = mock.Mock(
side_effect=ApiException('Could not connect to Kubernetes.'))
mock_k8s_run_manager_cls.return_value = k8s_workflow_run_manager
with mock.patch.multiple(
'reana_workflow_controller.rest.utils',
KubernetesWorkflowRunManager=mock_k8s_run_manager_cls):
with app.test_client() as client:
res = client.put(
url_for('statuses.set_workflow_status',
workflow_id_or_name=sample_serial_workflow_in_db.id_),
query_string={"user": default_user.id_,
"status": "start"},
content_type='application/json',
data=json.dumps({}))
assert res.status_code == 502
assert sample_serial_workflow_in_db.status == WorkflowStatus.created


@pytest.mark.parametrize("status", [WorkflowStatus.created,
WorkflowStatus.failed,
WorkflowStatus.finished,
Expand Down

0 comments on commit ce76002

Please sign in to comment.