Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow_run_manager: fix workflow stop #250

Merged
merged 3 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ before_install:
- pip install -r requirements-dev.txt
- travis_retry pip install --upgrade pip setuptools py
- travis_retry pip install twine wheel coveralls
- travis_retry pip install "mock>=3"

install:
- travis_retry pip install -e .[all]
Expand Down
3 changes: 1 addition & 2 deletions reana_workflow_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1829,9 +1829,8 @@ def _stop_workflow(workflow):
"""Stop a given workflow."""
if workflow.status == WorkflowStatus.running:
kwrm = KubernetesWorkflowRunManager(workflow)
job_list = workflow.job_progress.get('running', {}).get('job_ids', [])
workflow.run_stopped_at = datetime.now()
kwrm.stop_batch_workflow_run(job_list)
kwrm.stop_batch_workflow_run()
workflow.status = WorkflowStatus.stopped
current_db_sessions = Session.object_session(workflow)
current_db_sessions.add(workflow)
Expand Down
22 changes: 14 additions & 8 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
create_cvmfs_storage_class, format_cmd)
from reana_db.config import SQLALCHEMY_DATABASE_URI
from reana_db.database import Session
from reana_db.models import Job

from reana_workflow_controller.errors import (REANAInteractiveSessionError,
REANAWorkflowControllerError)
Expand Down Expand Up @@ -181,6 +182,15 @@ def _workflow_engine_env_vars(self):

return (env_vars)

def get_workflow_running_jobs_as_backend_ids(self):
"""Get all running jobs of a workflow as backend job IDs."""
session = Session.object_session(self.workflow)
job_list = self.workflow.job_progress.get(
'running', {}).get('job_ids', [])
rows = session.query(Job).filter(Job.id_.in_(job_list))
backend_ids = [j.backend_job_id for j in rows.all()]
return backend_ids


class KubernetesWorkflowRunManager(WorkflowRunManager):
"""Implementation of WorkflowRunManager for Kubernetes."""
Expand Down Expand Up @@ -293,15 +303,11 @@ def stop_interactive_session(self):
current_db_sessions.add(self.workflow)
current_db_sessions.commit()

def stop_batch_workflow_run(self, workflow_run_jobs=None):
"""Stop a batch workflow run along with all its dependent jobs.

:param workflow_run_jobs: List of active job id's spawned by the
workflow run.
"""
def stop_batch_workflow_run(self):
"""Stop a batch workflow run along with all its dependent jobs."""
workflow_run_name = self._workflow_run_name_generator('batch')
workflow_run_jobs = workflow_run_jobs or []
to_delete = workflow_run_jobs + [workflow_run_name]
to_delete = self.get_workflow_running_jobs_as_backend_ids() + \
[workflow_run_name]
for job in to_delete:
current_k8s_batchv1_api_client.delete_namespaced_job(
job,
Expand Down
58 changes: 57 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

import os
import shutil
import uuid

import pytest
from reana_db.models import Base, User
from reana_db.models import Base, Job, JobStatus, User
from sqlalchemy_utils import create_database, database_exists, drop_database

from reana_workflow_controller.factory import create_app
Expand All @@ -34,3 +35,58 @@ def base_app(tmp_shared_volume_path):
}
app_ = create_app(config_mapping)
return app_


@pytest.fixture()
def add_kubernetes_jobs_to_workflow(session):
"""Create and add jobs to a workflow.

This fixture provides a callable which takes the workflow to which the
created jobs should belong to. It can be parametrized to customize the
backend of the created jobs and the number of jobs to create.

.. code-block:: python

def test_stop_workflow(workflow, add_kubernetes_jobs_to_workflow):
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow.id_
backend='htcondor',
num_jobs=4)
"""
def add_kubernetes_jobs_to_workflow_callable(workflow, backend=None,
num_jobs=2, status=None):
"""Add Kubernetes jobs to a given workflow.

:param workflow_uuid: Workflow which the jobs should belong to.
:param backend: Backend of the created jobs.
:param num_jobs: Number of jobs to create.
:param status: String representing the status of the created jobs,
by default ``running``.
"""
jobs = []
if status and status not in JobStatus.__members__:
raise ValueError('Unknown status {} use one of {}'.format(
status, JobStatus.__members__))

status = status or JobStatus.running.name
backend = backend or 'kubernetes'
progress_dict = {
'total': {'job_ids': [], 'total': 0},
JobStatus.running.name: {'job_ids': [], 'total': 0},
JobStatus.failed.name: {'job_ids': [], 'total': 0},
JobStatus.finished.name: {'job_ids': [], 'total': 0},
}
for num in range(num_jobs):
reana_job_id = uuid.uuid4()
backend_job_id = uuid.uuid4()
job = Job(id_=reana_job_id,
backend_job_id=str(backend_job_id),
workflow_uuid=workflow.id_)
progress_dict[status]['job_ids'].append(str(job.id_))
progress_dict[status]['total'] += 1
session.add(job)
jobs.append(job)
workflow.job_progress = progress_dict
session.add(workflow)
session.commit()
return jobs
yield add_kubernetes_jobs_to_workflow_callable
29 changes: 26 additions & 3 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from kubernetes.client.rest import ApiException
from mock import DEFAULT, Mock, patch
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_db.models import WorkflowStatus

from reana_workflow_controller.errors import REANAInteractiveSessionError
from reana_workflow_controller.workflow_run_manager import \
Expand Down Expand Up @@ -55,9 +56,11 @@ def test_start_interactive_workflow_k8s_failure(sample_serial_workflow_in_db):


def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db):
"""Test the correct creation of all objects related to an interactive
sesison as well as writing the state to DB, either all should be done
or nothing.."""
"""Test atomic creation of interactive sessions.

All interactive session should be created as well as writing the state
to DB, either all should be done or nothing.
"""
mocked_k8s_client = Mock()
mocked_k8s_client.create_namespaced_deployment =\
Mock(side_effect=ApiException(
Expand All @@ -79,3 +82,23 @@ def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db):
mocked_k8s_client.delete_namespaced_ingress.assert_called_once()
mocked_k8s_client.delete_namespaced_deployment.assert_called_once()
assert sample_serial_workflow_in_db.interactive_session is None


def test_stop_workflow_backend_only_kubernetes(
sample_serial_workflow_in_db,
add_kubernetes_jobs_to_workflow):
"""Test deletion of workflows with only Kubernetes based jobs."""
workflow = sample_serial_workflow_in_db
workflow.status = WorkflowStatus.running
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow)
backend_job_ids = [job.backend_job_id for job in workflow_jobs]
with patch("reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client") as api_client:
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_batch_workflow_run()
for delete_call in api_client.delete_namespaced_job.call_args_list:
if delete_call.args[0] in backend_job_ids:
del backend_job_ids[backend_job_ids.index(
delete_call.args[0])]

assert not backend_job_ids