Skip to content

Commit

Permalink
Merge 7e21eaa into 236af39
Browse files Browse the repository at this point in the history
  • Loading branch information
Dinos Kousidis committed Jul 5, 2018
2 parents 236af39 + 7e21eaa commit 09c992c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
2 changes: 1 addition & 1 deletion reana_workflow_controller/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def consume_job_queue():
"""Consumes job queue and updates job status."""
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format='%(asctime)s - %(threadName)s - %(levelname)s: %(message)s'
)
consumer = Consumer()
Expand Down
11 changes: 7 additions & 4 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

from .config import (BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER,
STATUS_QUEUE)
from .tasks import (_update_job_progress, _update_run_progress,
_update_workflow_status)
from .tasks import (_update_job_cache, _update_job_progress,
_update_run_progress, _update_workflow_status)


class Consumer:
Expand Down Expand Up @@ -72,7 +72,10 @@ def on_message(self, channel, method_frame, header_frame, body):
if 'progress' in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
Session.commit()
# Caching: calculate input hash and store in JobCache
if 'caching_info' in msg:
_update_job_cache(msg)
Session.commit()

def consume(self):
"""Start consuming incoming messages."""
Expand All @@ -84,7 +87,7 @@ def consume(self):
except KeyboardInterrupt:
self._channel.stop_consuming()
self._conn.close()
except pika.exceptions.ConnectionClosedByBroker:
except pika.exceptions.ConnectionClosed:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
Expand Down
33 changes: 30 additions & 3 deletions reana_workflow_controller/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

from celery import Celery
from reana_commons.database import Session
from reana_commons.models import Job, Run, RunJobs, Workflow
from reana_commons.models import Job, JobCache, Run, RunJobs, Workflow
from reana_commons.utils import calculate_hash_of_dir, calculate_job_input_hash

from reana_workflow_controller.config import BROKER, POSSIBLE_JOB_STATUSES

Expand All @@ -53,6 +54,9 @@ def _update_workflow_status(workflow_uuid, status, logs):
def _update_run_progress(workflow_uuid, msg):
"""Register succeeded Jobs to DB."""
run = Session.query(Run).filter_by(workflow_uuid=workflow_uuid).first()
cached_jobs = None
if "cached" in msg['progress']:
cached_jobs = msg['progress']['cached']
for status in POSSIBLE_JOB_STATUSES:
if status in msg['progress']:
previous_total = getattr(run, status)
Expand All @@ -68,9 +72,11 @@ def _update_run_progress(workflow_uuid, msg):
job = Session.query(Job).\
filter_by(id_=job_id).one_or_none()
if job:
if job.status != status:
if job.status != status or \
(cached_jobs and
str(job.id_) in cached_jobs['job_ids']):
new_total += 1
new_total = previous_total + new_total
new_total += previous_total
setattr(run, status, new_total)
Session.add(run)

Expand Down Expand Up @@ -99,3 +105,24 @@ def _update_job_progress(workflow_uuid, msg):
run_job.run_id = current_run.id_
run_job.job_id = job_id
Session.add(run_job)


def _update_job_cache(msg):
"""Update caching information for finished job."""
cmd = msg['caching_info']['job_spec']['cmd']
# removes cd to workspace, to be refactored
clean_cmd = cmd.split(';')[1]
msg['caching_info']['job_spec']['cmd'] = clean_cmd
input_hash = calculate_job_input_hash(msg['caching_info']['job_spec'],
msg['caching_info']['workflow_json'])
directory_hash = calculate_hash_of_dir(
msg['caching_info'].get('workflow_workspace').
replace('data', 'reana/default'))
if directory_hash == -1:
return
cached_job = JobCache(
job_id=msg['caching_info'].get('job_id'),
parameters=input_hash,
result_path=msg['caching_info'].get('result_path'),
workspace_hash=directory_hash)
Session.add(cached_job)

0 comments on commit 09c992c

Please sign in to comment.