Skip to content

Commit

Permalink
workflow_run_manager: simplify stop interface
Browse files Browse the repository at this point in the history
* WorkflowRunManger can determine which jobs to delete by itself.
  • Loading branch information
Diego Rodriguez committed Sep 2, 2019
1 parent 2243f0b commit d1f81e1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
3 changes: 1 addition & 2 deletions reana_workflow_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1829,9 +1829,8 @@ def _stop_workflow(workflow):
"""Stop a given workflow."""
if workflow.status == WorkflowStatus.running:
kwrm = KubernetesWorkflowRunManager(workflow)
job_list = workflow.job_progress.get('running', {}).get('job_ids', [])
workflow.run_stopped_at = datetime.now()
kwrm.stop_batch_workflow_run(job_list)
kwrm.stop_batch_workflow_run()
workflow.status = WorkflowStatus.stopped
current_db_sessions = Session.object_session(workflow)
current_db_sessions.add(workflow)
Expand Down
12 changes: 6 additions & 6 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,12 @@ def _workflow_engine_env_vars(self):

return (env_vars)

def resolve_backend_ids(self, reana_job_ids):
def get_workflow_jobs_backend_ids(self):
"""Resolve REANA Job IDs to specific backend IDs."""
session = Session.object_session(self.workflow)
rows = session.query(Job).filter(Job.id_.in_(reana_job_ids))
job_list = self.workflow.job_progress.get(
'running', {}).get('job_ids', [])
rows = session.query(Job).filter(Job.id_.in_(job_list))
backend_ids = [j.backend_job_id for j in rows.all()]
return backend_ids

Expand Down Expand Up @@ -301,16 +303,14 @@ def stop_interactive_session(self):
current_db_sessions.add(self.workflow)
current_db_sessions.commit()

def stop_batch_workflow_run(self, workflow_run_jobs=None):
def stop_batch_workflow_run(self):
"""Stop a batch workflow run along with all its dependent jobs.
:param workflow_run_jobs: List of active job id's spawned by the
workflow run.
"""
workflow_run_name = self._workflow_run_name_generator('batch')
workflow_run_jobs = workflow_run_jobs or []
to_delete = self.resolve_backend_ids(workflow_run_jobs) + \
[workflow_run_name]
to_delete = self.get_workflow_jobs_backend_ids() + [workflow_run_name]
for job in to_delete:
current_k8s_batchv1_api_client.delete_namespaced_job(
job,
Expand Down
25 changes: 21 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,41 @@ def test_stop_workflow(workflow, add_kubernetes_jobs_to_workflow):
backend='htcondor',
num_jobs=4)
"""
def add_kubernetes_jobs_to_workflow_callable(workflow_uuid, backend=None,
num_jobs=2):
def add_kubernetes_jobs_to_workflow_callable(workflow, backend=None,
num_jobs=2, status=None):
"""Add Kubernetes jobs.
:param workflow_uuid: Workflow which the jobs should belong to.
:param backend: Backend of the created jobs.
:param num_jobs: Number of jobs to create.
:param status: String representing the status of the created jobs,
by default ``running``.
"""
jobs = []
if status and status not in JobStatus.__members__:
raise ValueError('Unknown status {} use one of {}'.format(
status, JobStatus.__members__))

status = status or JobStatus.running.name
backend = backend or 'kubernetes'
progress_dict = {
'total': {'job_ids': [], 'total': 0},
JobStatus.running.name: {'job_ids': [], 'total': 0},
JobStatus.failed.name: {'job_ids': [], 'total': 0},
JobStatus.finished.name: {'job_ids': [], 'total': 0},
}
for num in range(num_jobs):
reana_job_id = uuid.uuid4()
backend_job_id = uuid.uuid4()
job = Job(id_=reana_job_id,
backend_job_id=str(backend_job_id),
workflow_uuid=workflow_uuid)
workflow_uuid=workflow.id_)
progress_dict[status]['job_ids'].append(str(job.id_))
progress_dict[status]['total'] += 1
session.add(job)
session.commit()
jobs.append(job)
workflow.job_progress = progress_dict
session.add(workflow)
session.commit()
return jobs
yield add_kubernetes_jobs_to_workflow_callable
6 changes: 2 additions & 4 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ def test_stop_workflow_backend_only_kubernetes(
"""Test deletion of workflows with only Kubernetes based jobs."""
workflow = sample_serial_workflow_in_db
workflow.status = WorkflowStatus.running
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow.id_)
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow)
backend_job_ids = [job.backend_job_id for job in workflow_jobs]
workflow_job_reana_ids = \
[str(workflow.id_) for workflow in workflow_jobs]
with patch("reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client") as api_client:
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_batch_workflow_run(workflow_job_reana_ids)
kwrm.stop_batch_workflow_run()
for delete_call in api_client.delete_namespaced_job.call_args_list:
if delete_call.args[0] in backend_job_ids:
del backend_job_ids[backend_job_ids.index(
Expand Down

0 comments on commit d1f81e1

Please sign in to comment.