From 8e2819df19e971e9f4ca37210e10d6b1aede4c96 Mon Sep 17 00:00:00 2001 From: Dinos Kousidis Date: Tue, 5 Jun 2018 16:08:29 +0200 Subject: [PATCH 1/4] tasks: receive job caching info Signed-off-by: Dinos Kousidis --- Dockerfile | 2 +- reana_workflow_controller/tasks.py | 22 +++++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1f1f9040..8846a7d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update && \ apt-get install -y vim-tiny && \ pip install --upgrade pip -RUN pip install -e git://github.com/reanahub/reana-commons.git@master#egg=reana-commons +RUN pip install -e git://github.com/dinosk/reana-commons.git@models-job-table-dev#egg=reana-commons COPY CHANGES.rst README.rst setup.py /code/ COPY reana_workflow_controller/version.py /code/reana_workflow_controller/ diff --git a/reana_workflow_controller/tasks.py b/reana_workflow_controller/tasks.py index 12a46763..8579efb1 100644 --- a/reana_workflow_controller/tasks.py +++ b/reana_workflow_controller/tasks.py @@ -24,11 +24,14 @@ from __future__ import absolute_import +import json import uuid +import hashlib from celery import Celery from reana_commons.database import Session -from reana_commons.models import Job, Run, RunJobs, Workflow +from reana_commons.models import Job, JobCache, Run, RunJobs, Workflow +from reana_commons.utils import calculate_hash_of_dir from reana_workflow_controller.config import BROKER, POSSIBLE_JOB_STATUSES @@ -99,3 +102,20 @@ def _update_job_progress(workflow_uuid, msg): run_job.run_id = current_run.id_ run_job.job_id = job_id Session.add(run_job) + # Caching: calculate input hash and store in JobCache + if 'caching_info' in msg: + job_md5_buffer = hashlib.md5() + job_md5_buffer.update(json.dumps( + msg['caching_info']['job_spec']).encode('utf-8')) + job_md5_buffer.update(json.dumps( + msg['caching_info']['workflow_json']).encode('utf-8')) + input_hash = job_md5_buffer.digest() + + cached_job = JobCache( + job_id=msg['caching_info']['job_id'], + parameters=input_hash, + result_path=msg['caching_info']['result_path'], + workspace_hash=calculate_hash_of_dir( + msg['caching_info']['workflow_directory']. + replace('data', 'reana/default'))) + Session.add(cached_job) From 539e59d8638913138de2fde1d9ee297b888df306 Mon Sep 17 00:00:00 2001 From: Dinos Kousidis Date: Fri, 29 Jun 2018 17:49:45 +0300 Subject: [PATCH 2/4] tasks: update job cache task Signed-off-by: Dinos Kousidis --- Dockerfile | 2 +- reana_workflow_controller/cli.py | 2 +- reana_workflow_controller/consumer.py | 9 ++++--- reana_workflow_controller/tasks.py | 38 ++++++++++++++------------- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8846a7d3..e8ad16d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update && \ apt-get install -y vim-tiny && \ pip install --upgrade pip -RUN pip install -e git://github.com/dinosk/reana-commons.git@models-job-table-dev#egg=reana-commons +RUN pip install -e git://github.com/dinosk/reana-commons.git@job-caching#egg=reana-commons COPY CHANGES.rst README.rst setup.py /code/ COPY reana_workflow_controller/version.py /code/reana_workflow_controller/ diff --git a/reana_workflow_controller/cli.py b/reana_workflow_controller/cli.py index 140f5a19..c7994f19 100644 --- a/reana_workflow_controller/cli.py +++ b/reana_workflow_controller/cli.py @@ -33,7 +33,7 @@ def consume_job_queue(): """Consumes job queue and updates job status.""" logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format='%(asctime)s - %(threadName)s - %(levelname)s: %(message)s' ) consumer = Consumer() diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 4cb3fd2e..87d4959e 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -31,7 +31,7 @@ from .config import (BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER, STATUS_QUEUE) from .tasks import (_update_job_progress, _update_run_progress, - _update_workflow_status) + _update_workflow_status, _update_job_cache) class Consumer: @@ -72,7 +72,10 @@ def on_message(self, channel, method_frame, header_frame, body): if 'progress' in msg: _update_run_progress(workflow_uuid, msg) _update_job_progress(workflow_uuid, msg) - Session.commit() + # Caching: calculate input hash and store in JobCache + if 'caching_info' in msg: + _update_job_cache(msg) + Session.commit() def consume(self): """Start consuming incoming messages.""" @@ -84,7 +87,7 @@ def consume(self): except KeyboardInterrupt: self._channel.stop_consuming() self._conn.close() - except pika.exceptions.ConnectionClosedByBroker: + except pika.exceptions.ConnectionClosed: # Uncomment this to make the example not attempt recovery # from server-initiated connection closure, including # when the node is stopped cleanly diff --git a/reana_workflow_controller/tasks.py b/reana_workflow_controller/tasks.py index 8579efb1..fe415045 100644 --- a/reana_workflow_controller/tasks.py +++ b/reana_workflow_controller/tasks.py @@ -73,7 +73,7 @@ def _update_run_progress(workflow_uuid, msg): if job: if job.status != status: new_total += 1 - new_total = previous_total + new_total + new_total += previous_total setattr(run, status, new_total) Session.add(run) @@ -102,20 +102,22 @@ def _update_job_progress(workflow_uuid, msg): run_job.run_id = current_run.id_ run_job.job_id = job_id Session.add(run_job) - # Caching: calculate input hash and store in JobCache - if 'caching_info' in msg: - job_md5_buffer = hashlib.md5() - job_md5_buffer.update(json.dumps( - msg['caching_info']['job_spec']).encode('utf-8')) - job_md5_buffer.update(json.dumps( - msg['caching_info']['workflow_json']).encode('utf-8')) - input_hash = job_md5_buffer.digest() - - cached_job = JobCache( - job_id=msg['caching_info']['job_id'], - parameters=input_hash, - result_path=msg['caching_info']['result_path'], - workspace_hash=calculate_hash_of_dir( - msg['caching_info']['workflow_directory']. - replace('data', 'reana/default'))) - Session.add(cached_job) + + +def _update_job_cache(msg): + """Update caching information for finished job.""" + job_md5_buffer = hashlib.md5() + job_md5_buffer.update(json.dumps( + msg['caching_info']['job_spec']).encode('utf-8')) + job_md5_buffer.update(json.dumps( + msg['caching_info']['workflow_json']).encode('utf-8')) + input_hash = job_md5_buffer.digest() + + cached_job = JobCache( + job_id=msg['caching_info'].get('job_id'), + parameters=input_hash, + result_path=msg['caching_info'].get('result_path'), + workspace_hash=calculate_hash_of_dir( + msg['caching_info'].get('workflow_workspace'). + replace('data', 'reana/default'))) + Session.add(cached_job) From 20274a47088c8d715f64dc2bd0fac36cc7d42267 Mon Sep 17 00:00:00 2001 From: Dinos Kousidis Date: Sun, 1 Jul 2018 17:17:26 +0300 Subject: [PATCH 3/4] tasks: refactor calculate_job_input_hash method Signed-off-by: Dinos Kousidis --- Dockerfile | 2 +- reana_workflow_controller/consumer.py | 4 ++-- reana_workflow_controller/tasks.py | 23 ++++++++++++----------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Dockerfile b/Dockerfile index e8ad16d6..1f1f9040 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update && \ apt-get install -y vim-tiny && \ pip install --upgrade pip -RUN pip install -e git://github.com/dinosk/reana-commons.git@job-caching#egg=reana-commons +RUN pip install -e git://github.com/reanahub/reana-commons.git@master#egg=reana-commons COPY CHANGES.rst README.rst setup.py /code/ COPY reana_workflow_controller/version.py /code/reana_workflow_controller/ diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 87d4959e..89314d93 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -30,8 +30,8 @@ from .config import (BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER, STATUS_QUEUE) -from .tasks import (_update_job_progress, _update_run_progress, - _update_workflow_status, _update_job_cache) +from .tasks import (_update_job_cache, _update_job_progress, + _update_run_progress, _update_workflow_status) class Consumer: diff --git a/reana_workflow_controller/tasks.py b/reana_workflow_controller/tasks.py index fe415045..a9ea2a86 100644 --- a/reana_workflow_controller/tasks.py +++ b/reana_workflow_controller/tasks.py @@ -24,14 +24,12 @@ from __future__ import absolute_import -import json import uuid -import hashlib from celery import Celery from reana_commons.database import Session from reana_commons.models import Job, JobCache, Run, RunJobs, Workflow -from reana_commons.utils import calculate_hash_of_dir +from reana_commons.utils import calculate_hash_of_dir, calculate_job_input_hash from reana_workflow_controller.config import BROKER, POSSIBLE_JOB_STATUSES @@ -56,6 +54,9 @@ def _update_workflow_status(workflow_uuid, status, logs): def _update_run_progress(workflow_uuid, msg): """Register succeeded Jobs to DB.""" run = Session.query(Run).filter_by(workflow_uuid=workflow_uuid).first() + cached_jobs = None + if "cached" in msg['progress']: + cached_jobs = msg['progress']['cached'] for status in POSSIBLE_JOB_STATUSES: if status in msg['progress']: previous_total = getattr(run, status) @@ -71,7 +72,9 @@ def _update_run_progress(workflow_uuid, msg): job = Session.query(Job).\ filter_by(id_=job_id).one_or_none() if job: - if job.status != status: + if job.status != status or \ + (cached_jobs and + str(job.id_) in cached_jobs['job_ids']): new_total += 1 new_total += previous_total setattr(run, status, new_total) @@ -106,13 +109,11 @@ def _update_job_progress(workflow_uuid, msg): def _update_job_cache(msg): """Update caching information for finished job.""" - job_md5_buffer = hashlib.md5() - job_md5_buffer.update(json.dumps( - msg['caching_info']['job_spec']).encode('utf-8')) - job_md5_buffer.update(json.dumps( - msg['caching_info']['workflow_json']).encode('utf-8')) - input_hash = job_md5_buffer.digest() - + cmd = msg['caching_info']['job_spec']['cmd'] + clean_cmd = cmd.split(';')[1] + msg['caching_info']['job_spec']['cmd'] = clean_cmd + input_hash = calculate_job_input_hash(msg['caching_info']['job_spec'], + msg['caching_info']['workflow_json']) cached_job = JobCache( job_id=msg['caching_info'].get('job_id'), parameters=input_hash, From 7e21eaa0bde60b43b99d524179f5588ba3603bbb Mon Sep 17 00:00:00 2001 From: Dinos Kousidis Date: Thu, 5 Jul 2018 15:45:52 +0200 Subject: [PATCH 4/4] tasks: return if workspace hash is missing Signed-off-by: Dinos Kousidis --- reana_workflow_controller/tasks.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/reana_workflow_controller/tasks.py b/reana_workflow_controller/tasks.py index a9ea2a86..59487cc9 100644 --- a/reana_workflow_controller/tasks.py +++ b/reana_workflow_controller/tasks.py @@ -110,15 +110,19 @@ def _update_job_progress(workflow_uuid, msg): def _update_job_cache(msg): """Update caching information for finished job.""" cmd = msg['caching_info']['job_spec']['cmd'] + # removes cd to workspace, to be refactored clean_cmd = cmd.split(';')[1] msg['caching_info']['job_spec']['cmd'] = clean_cmd input_hash = calculate_job_input_hash(msg['caching_info']['job_spec'], msg['caching_info']['workflow_json']) + directory_hash = calculate_hash_of_dir( + msg['caching_info'].get('workflow_workspace'). + replace('data', 'reana/default')) + if directory_hash == -1: + return cached_job = JobCache( job_id=msg['caching_info'].get('job_id'), parameters=input_hash, result_path=msg['caching_info'].get('result_path'), - workspace_hash=calculate_hash_of_dir( - msg['caching_info'].get('workflow_workspace'). - replace('data', 'reana/default'))) + workspace_hash=directory_hash) Session.add(cached_job)