Skip to content

Commit

Permalink
Merge c9e7703 into 27be7df
Browse files Browse the repository at this point in the history
  • Loading branch information
Dinos Kousidis committed Aug 6, 2018
2 parents 27be7df + c9e7703 commit 4582f22
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Expand Up @@ -24,6 +24,7 @@ RUN dnf -y update && \
dnf install -y dnf redhat-rpm-config

RUN pip install --upgrade pip
RUN pip install -e git+git://github.com/dinosk/reana-commons.git@4-merge-workflow-commons#egg=reana-commons

COPY CHANGES.rst README.rst setup.py /code/
COPY reana_workflow_engine_yadage/version.py /code/reana_workflow_engine_yadage/
Expand All @@ -34,6 +35,7 @@ RUN pip install --no-cache-dir requirements-builder && \

COPY . /code


# Debug off by default
ARG DEBUG=false

Expand Down
7 changes: 4 additions & 3 deletions reana_workflow_engine_yadage/externalbackend.py
Expand Up @@ -32,7 +32,7 @@

from . import submit
from .celeryapp import app
from .utils import publish_workflow_status
from .utils import publisher

log = logging.getLogger('yadage.cap.externalproxy')

Expand Down Expand Up @@ -119,8 +119,9 @@ def submit(self, spec, parameters, state, metadata):
image, wrapped_cmd, prettified_cmd)

log.info('submitted job: %s', job_id)
publish_workflow_status(app.current_worker_task.workflow_uuid, 1,
message={"job_id": job_id.decode('utf-8')})
publisher.publish_workflow_status(
app.current_worker_task.workflow_uuid, 1,
message={"job_id": job_id.decode('utf-8')})
return ExternalProxy(
job_id=job_id,
spec=spec,
Expand Down
11 changes: 6 additions & 5 deletions reana_workflow_engine_yadage/tasks.py
Expand Up @@ -33,7 +33,7 @@
from .celeryapp import app
from .config import SHARED_VOLUME_PATH
from .tracker import REANATracker
from .utils import publish_workflow_status
from .utils import publisher

log = logging.getLogger(__name__)

Expand All @@ -60,7 +60,6 @@ def run_yadage_workflow(workflow_uuid, workflow_workspace,
workflow_kwargs = dict(workflow=workflow, toplevel=toplevel)

dataopts = {'initdir': workflow_workspace}

try:
with steering_ctx(dataarg=workflow_workspace,
dataopts=dataopts,
Expand All @@ -72,17 +71,19 @@ def run_yadage_workflow(workflow_uuid, workflow_workspace,
**workflow_kwargs) as ys:

log.info('running workflow on context: {0}'.format(locals()))
publish_workflow_status(workflow_uuid, 1)
publisher.publish_workflow_status(workflow_uuid, 1)

ys.adage_argument(additional_trackers=[
REANATracker(identifier=workflow_uuid)])

publish_workflow_status(workflow_uuid, 2)
publisher.publish_workflow_status(workflow_uuid, 2)

log.info('Workflow {workflow_uuid} finished. Files available '
'at {workflow_workspace}.'.format(
workflow_uuid=workflow_uuid,
workflow_workspace=workflow_workspace))
except Exception as e:
log.info('workflow failed: {0}'.format(e))
publish_workflow_status(workflow_uuid, 3)
publisher.publish_workflow_status(workflow_uuid, 3)
finally:
publisher.close()
11 changes: 4 additions & 7 deletions reana_workflow_engine_yadage/tracker.py
Expand Up @@ -31,11 +31,7 @@
import networkx as nx
from yadage.utils import WithJsonRefEncoder

from .utils import publish_workflow_status

# def publish_workflow_status(*args, **kwargs):
# pass

from .utils import publisher

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -129,8 +125,9 @@ def track(self, adageobj):
{}
'''.format(self.workflow_id,
json.dumps(progress, indent=4), log_message))
publish_workflow_status(self.workflow_id, status=1, message={
"progress": progress}, logs=log_message)
publisher.publish_workflow_status(
self.workflow_id, status=1, logs=log_message,
message={"progress": progress})

def finalize(self, adageobj):
self.track(adageobj)
34 changes: 3 additions & 31 deletions reana_workflow_engine_yadage/utils.py
Expand Up @@ -22,35 +22,7 @@

import json

import pika
from reana_commons.publisher import Publisher

from .config import BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER


def publish_workflow_status(workflow_uuid, status, message=None, logs=None):
"""Update database workflow status.
:param workflow_uuid: UUID which represents the workflow.
:param status: String that represents the analysis status.
:param status_message: String that represents the message related with the
status, if there is any.
"""

broker_credentials = pika.PlainCredentials(BROKER_USER,
BROKER_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(BROKER_URL,
BROKER_PORT,
'/',
broker_credentials))
channel = connection.channel()
channel.queue_declare(queue='jobs-status')
channel.basic_publish(exchange='',
routing_key='jobs-status',
body=json.dumps({"workflow_uuid": workflow_uuid,
"status": status,
"logs": logs,
"message": message}),
properties=pika.BasicProperties(
delivery_mode=2, # msg persistent
))
publisher = Publisher()
publisher.connect()
1 change: 1 addition & 0 deletions requirements-dev.txt
@@ -1,3 +1,4 @@
wdb
ipdb
Flask-DebugToolbar
git+git://github.com/reanahub/reana-commons.git#egg=reana-commons
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -69,6 +69,7 @@
'pyzmq==16.0.2',
'requests==2.11.1',
'yadage-schemas==0.7.16',
'reana-commons>=0.3.0.dev20180418',
'adage==0.8.5',
'packtivity==0.10.0',
'yadage==0.13.5',
Expand Down

0 comments on commit 4582f22

Please sign in to comment.