Skip to content

Commit

Permalink
Merge 75933d5 into 5e31c15
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego committed Aug 12, 2019
2 parents 5e31c15 + 75933d5 commit 3576e32
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 153 deletions.
126 changes: 123 additions & 3 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@

"""REANA Workflow Controller MQ Consumer."""

from __future__ import absolute_import

import json
import uuid

from kubernetes.client.rest import ApiException
from reana_commons.consumer import BaseConsumer
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.utils import (calculate_file_access_time,
calculate_hash_of_dir,
calculate_job_input_hash)
from reana_db.database import Session
from reana_db.models import WorkflowStatus
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from sqlalchemy.orm.attributes import flag_modified

from .tasks import (_update_job_cache, _update_job_progress,
_update_run_progress, _update_workflow_status)
from reana_workflow_controller.config import PROGRESS_STATUSES
from reana_workflow_controller.errors import REANAWorkflowControllerError


class JobStatusConsumer(BaseConsumer):
Expand Down Expand Up @@ -52,3 +61,114 @@ def on_message(self, body, message):
if 'caching_info' in msg:
_update_job_cache(msg)
Session.commit()


def _update_workflow_status(workflow_uuid, status, logs):
"""Update workflow status in DB."""
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow_uuid)


def _update_run_progress(workflow_uuid, msg):
"""Register succeeded Jobs to DB."""
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid).\
one_or_none()
cached_jobs = None
job_progress = workflow.job_progress
if "cached" in msg['progress']:
cached_jobs = msg['progress']['cached']
for status in PROGRESS_STATUSES:
if status in msg['progress']:
previous_status = workflow.job_progress.get(status)
previous_total = 0
if previous_status:
previous_total = previous_status.get('total') or 0
if status == 'total':
if previous_total > 0:
continue
else:
job_progress['total'] = \
msg['progress']['total']
else:
if previous_status:
new_job_ids = set(previous_status.get('job_ids') or
set()) | \
set(msg['progress'][status]['job_ids'])
else:
new_job_ids = set(msg['progress'][status]['job_ids'])
job_progress[status] = {'total': len(new_job_ids),
'job_ids': list(new_job_ids)}
workflow.job_progress = job_progress
flag_modified(workflow, 'job_progress')
Session.add(workflow)


def _update_job_progress(workflow_uuid, msg):
"""Update job progress for jobs in received message."""
for status in PROGRESS_STATUSES:
if status in msg['progress']:
status_progress = msg['progress'][status]
for job_id in status_progress['job_ids']:
try:
uuid.UUID(job_id)
except Exception:
continue
Session.query(Job).filter_by(id_=job_id).\
update({'workflow_uuid': workflow_uuid,
'status': status})


def _update_job_cache(msg):
"""Update caching information for finished job."""
cached_job = Session.query(JobCache).filter_by(
job_id=msg['caching_info'].get('job_id')).first()

input_files = []
if cached_job:
file_access_times = calculate_file_access_time(
msg['caching_info'].get('workflow_workspace'))
for filename in cached_job.access_times:
if filename in file_access_times:
input_files.append(filename)
else:
return
cmd = msg['caching_info']['job_spec']['cmd']
# removes cd to workspace, to be refactored
clean_cmd = ';'.join(cmd.split(';')[1:])
msg['caching_info']['job_spec']['cmd'] = clean_cmd

if 'workflow_workspace' in msg['caching_info']['job_spec']:
del msg['caching_info']['job_spec']['workflow_workspace']
input_hash = calculate_job_input_hash(msg['caching_info']['job_spec'],
msg['caching_info']['workflow_json'])
workspace_hash = calculate_hash_of_dir(
msg['caching_info'].get('workflow_workspace'), input_files)
if workspace_hash == -1:
return

cached_job.parameters = input_hash
cached_job.result_path = msg['caching_info'].get('result_path')
cached_job.workspace_hash = workspace_hash
Session.add(cached_job)


def _delete_workflow_engine_pod(workflow_uuid):
"""Delete workflow engine pod."""
try:
jobs = current_k8s_batchv1_api_client.list_namespaced_job(
namespace='default',
)
for job in jobs.items:
if workflow_uuid in job.metadata.name:
current_k8s_batchv1_api_client.delete_namespaced_job(
namespace='default',
propagation_policy="Background",
name=job.metadata.name)
break
except ApiException as e:
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be deleted {}.".format(e))
143 changes: 0 additions & 143 deletions reana_workflow_controller/tasks.py

This file was deleted.

3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@
]

install_requires = [
'celery>=4.1.0,<4.3',
'Flask-SQLAlchemy>=2.2',
'Flask>=0.12',
'fs>=2.0',
'jsonpickle>=0.9.6',
'marshmallow>=2.13',
'packaging>=18.0',
'reana-commons[kubernetes]>=0.6.0.dev20190809,<0.7.0',
'reana-commons[kubernetes]>=0.6.0.dev20190812,<0.7.0',
'reana-db>=0.6.0.dev20190715,<0.7.0',
'requests==2.20.0',
'sqlalchemy-utils>=0.31.0',
Expand Down
5 changes: 0 additions & 5 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ def test_set_workflow_status(app, corev1_api_client_with_user_secrets,
Workflow.id_ == workflow_created_uuid).first()
assert workflow.status == WorkflowStatus.created
payload = START
# replace celery task with Mock()
with mock.patch(
'reana_workflow_controller.workflow_run_manager.'
'current_k8s_batchv1_api_client') as k8s_api_client:
Expand All @@ -489,8 +488,6 @@ def test_set_workflow_status(app, corev1_api_client_with_user_secrets,
"status": "start"})
json_response = json.loads(res.data.decode())
assert json_response.get('status') == status_dict[payload].name
# assert the celery task was called with the following
# arguments
k8s_api_client.create_namespaced_job.assert_called_once()


Expand All @@ -513,7 +510,6 @@ def test_start_already_started_workflow(app, session, default_user,
Workflow.id_ == workflow_created_uuid).first()
assert workflow.status == WorkflowStatus.created
payload = START
# replace celery task with Mock()
with mock.patch('reana_workflow_controller.workflow_run_manager.'
'current_k8s_batchv1_api_client'):
# provide user secret store
Expand Down Expand Up @@ -778,7 +774,6 @@ def test_start_input_parameters(app, session, default_user, user_secrets,
payload = START
parameters = {'input_parameters': {'first': 'test'},
'operational_options': {}}
# replace celery task with Mock()
with mock.patch('reana_workflow_controller.workflow_run_manager.'
'current_k8s_batchv1_api_client'):
# provide user secret store
Expand Down

0 comments on commit 3576e32

Please sign in to comment.