Skip to content

Commit

Permalink
Merge e7d9cc6 into 6b9dfb1
Browse files Browse the repository at this point in the history
  • Loading branch information
Dinos Kousidis authored Jun 25, 2018
2 parents 6b9dfb1 + e7d9cc6 commit e5aca74
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 55 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ENV UWSGI_PROCESSES ${UWSGI_PROCESSES:-2}
ARG UWSGI_THREADS=2
ENV UWSGI_THREADS ${UWSGI_THREADS:-2}
ENV TERM=xterm
ENV PYTHONPATH=/workdir

CMD uwsgi --module reana_workflow_controller.app:app \
--http-socket 0.0.0.0:5000 --master \
Expand Down
53 changes: 11 additions & 42 deletions reana_workflow_controller/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@

import json
import logging
import uuid

import click
import pika
from reana_commons.database import Session
from reana_commons.models import Job, Run, RunJobs, Workflow, WorkflowStatus
from reana_commons.models import WorkflowStatus

from reana_workflow_controller.config import (BROKER, BROKER_PASS, BROKER_PORT,
from reana_workflow_controller.config import (BROKER_PASS, BROKER_PORT,
BROKER_URL, BROKER_USER)
from reana_workflow_controller.tasks import (_update_job_progress,
_update_run_progress,
_update_workflow_status)


@click.command('consume-job-queue')
Expand All @@ -43,50 +45,17 @@ def consume_job_queue():
format='%(asctime)s - %(threadName)s - %(levelname)s: %(message)s'
)

job_statuses = ['submitted', 'succeeded', 'failed', 'planned']

def _update_run_progress(workflow_uuid, msg):
"""Register succeeded Jobs to DB."""
logging.info(
'Updating progress for workflow {0}:\n {1}'.format(workflow_uuid,
msg))
Session.query(Run).filter_by(workflow_uuid=workflow_uuid).\
update({status: msg['progress'][status]['total']
for status in job_statuses})

def _update_job_progress(workflow_uuid, msg):
"""Update job progress for jobs in received message."""
current_run = Session.query(Run).filter_by(
workflow_uuid=workflow_uuid).one_or_none()
for status in job_statuses:
status_progress = msg['progress'][status]
for job_id in status_progress['job_ids']:
Session.query(Job).filter_by(id_=job_id).\
update({'workflow_uuid': workflow_uuid,
'status': status})
run_job = Session.query(RunJobs).filter_by(
run_id=current_run.id_,
job_id=job_id).first()
if not run_job:
run_job = RunJobs()
run_job.id_ = uuid.uuid4()
run_job.run_id = current_run.id_
run_job.job_id = job_id
Session.add(run_job)
logging.info(
'Registering job {0} to run {1}.'.format(
job_id, current_run.id_))

def _callback_job_status(ch, method, properties, body):
body_dict = json.loads(body)
workflow_uuid = body_dict.get('workflow_uuid')
if workflow_uuid:
status = WorkflowStatus(body_dict.get('status'))
logging.info("Received workflow_uuid: {0} status: {1}".
format(workflow_uuid, status))
status = body_dict.get('status')
if status:
status = WorkflowStatus(status)
print(" [x] Received workflow_uuid: {0} status: {1}".
format(workflow_uuid, status))
logs = body_dict.get('logs') or ''
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
_update_workflow_status(workflow_uuid, status, logs)
if 'message' in body_dict and body_dict.get('message'):
msg = body_dict['message']
if 'progress' in msg:
Expand Down
2 changes: 2 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@

WORKFLOW_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
"""Time format for workflow starting time, created time etc."""

POSSIBLE_JOB_STATUSES = ['submitted', 'succeeded', 'failed', 'planned']
18 changes: 7 additions & 11 deletions reana_workflow_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,17 +1182,13 @@ def get_workflow_status(workflow_id_or_name): # noqa

if run_info:
current_command = ''
if workflow.type_ == 'serial':
total_jobs = run_info.planned
current_job_commands = _get_current_job_commands(run_info.id_)
try:
current_job_id, current_command = current_job_commands.\
popitem()
except Exception:
pass
else:
total_jobs = run_info.planned + run_info.succeeded \
+ run_info.submitted + run_info.failed
total_jobs = run_info.planned
current_job_commands = _get_current_job_commands(run_info.id_)
try:
current_job_id, current_command = current_job_commands.\
popitem()
except Exception:
pass
# all_run_job_ids = _get_all_run_job_ids(run_info)
progress = {'planned': run_info.planned,
'submitted': run_info.submitted,
Expand Down
67 changes: 65 additions & 2 deletions reana_workflow_controller/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,78 @@

from __future__ import absolute_import

import uuid

from celery import Celery
from reana_commons.database import Session
from reana_commons.models import Job, Run, RunJobs, Workflow

from reana_workflow_controller.config import BROKER
from reana_workflow_controller.config import BROKER, POSSIBLE_JOB_STATUSES

celery = Celery('tasks', broker=BROKER)
celery = Celery('tasks',
broker=BROKER)

celery.conf.update(CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json')


run_yadage_workflow = celery.signature('tasks.run_yadage_workflow')
run_cwl_workflow = celery.signature('tasks.run_cwl_workflow')
run_serial_workflow = celery.signature('tasks.run_serial_workflow')


def _update_workflow_status(workflow_uuid, status, logs):
"""Update workflow status in DB."""
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)


def _update_run_progress(workflow_uuid, msg):
"""Register succeeded Jobs to DB."""
run = Session.query(Run).filter_by(workflow_uuid=workflow_uuid).first()
for status in POSSIBLE_JOB_STATUSES:
if status in msg['progress']:
previous_total = getattr(run, status)
if status == 'planned':
if previous_total > 0:
continue
else:
setattr(run, status,
msg['progress']['planned']['total'])
else:
new_total = 0
for job_id in msg['progress'][status]['job_ids']:
job = Session.query(Job).\
filter_by(id_=job_id).one_or_none()
if job:
if job.status != status:
new_total += 1
new_total = previous_total + new_total
setattr(run, status, new_total)
Session.add(run)


def _update_job_progress(workflow_uuid, msg):
"""Update job progress for jobs in received message."""
current_run = Session.query(Run).filter_by(
workflow_uuid=workflow_uuid).one_or_none()
for status in POSSIBLE_JOB_STATUSES:
if status in msg['progress']:
status_progress = msg['progress'][status]
for job_id in status_progress['job_ids']:
try:
uuid.UUID(job_id)
except Exception:
continue
Session.query(Job).filter_by(id_=job_id).\
update({'workflow_uuid': workflow_uuid,
'status': status})
run_job = Session.query(RunJobs).filter_by(
run_id=current_run.id_,
job_id=job_id).first()
if not run_job and current_run:
run_job = RunJobs()
run_job.id_ = uuid.uuid4()
run_job.run_id = current_run.id_
run_job.job_id = job_id
Session.add(run_job)

0 comments on commit e5aca74

Please sign in to comment.