From c9e77032156660ecf10c46348513b58c657c8062 Mon Sep 17 00:00:00 2001 From: Dinos Kousidis Date: Mon, 6 Aug 2018 14:23:24 +0200 Subject: [PATCH] global: use publisher from reana-commons * Imports publisher from reana-commons to publish the workflow progress. Signed-off-by: Dinos Kousidis --- Dockerfile | 2 ++ .../externalbackend.py | 7 ++-- reana_workflow_engine_yadage/tasks.py | 11 +++--- reana_workflow_engine_yadage/tracker.py | 11 +++--- reana_workflow_engine_yadage/utils.py | 34 ++----------------- requirements-dev.txt | 1 + setup.py | 1 + 7 files changed, 21 insertions(+), 46 deletions(-) diff --git a/Dockerfile b/Dockerfile index bd1d606..d4c07f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ RUN dnf -y update && \ dnf install -y dnf redhat-rpm-config RUN pip install --upgrade pip +RUN pip install -e git+git://github.com/dinosk/reana-commons.git@4-merge-workflow-commons#egg=reana-commons COPY CHANGES.rst README.rst setup.py /code/ COPY reana_workflow_engine_yadage/version.py /code/reana_workflow_engine_yadage/ @@ -34,6 +35,7 @@ RUN pip install --no-cache-dir requirements-builder && \ COPY . /code + # Debug off by default ARG DEBUG=false diff --git a/reana_workflow_engine_yadage/externalbackend.py b/reana_workflow_engine_yadage/externalbackend.py index 112bbaf..1e8cc0a 100644 --- a/reana_workflow_engine_yadage/externalbackend.py +++ b/reana_workflow_engine_yadage/externalbackend.py @@ -32,7 +32,7 @@ from . import submit from .celeryapp import app -from .utils import publish_workflow_status +from .utils import publisher log = logging.getLogger('yadage.cap.externalproxy') @@ -119,8 +119,9 @@ def submit(self, spec, parameters, state, metadata): image, wrapped_cmd, prettified_cmd) log.info('submitted job: %s', job_id) - publish_workflow_status(app.current_worker_task.workflow_uuid, 1, - message={"job_id": job_id.decode('utf-8')}) + publisher.publish_workflow_status( + app.current_worker_task.workflow_uuid, 1, + message={"job_id": job_id.decode('utf-8')}) return ExternalProxy( job_id=job_id, spec=spec, diff --git a/reana_workflow_engine_yadage/tasks.py b/reana_workflow_engine_yadage/tasks.py index bbb9bd2..c76efbf 100644 --- a/reana_workflow_engine_yadage/tasks.py +++ b/reana_workflow_engine_yadage/tasks.py @@ -33,7 +33,7 @@ from .celeryapp import app from .config import SHARED_VOLUME_PATH from .tracker import REANATracker -from .utils import publish_workflow_status +from .utils import publisher log = logging.getLogger(__name__) @@ -60,7 +60,6 @@ def run_yadage_workflow(workflow_uuid, workflow_workspace, workflow_kwargs = dict(workflow=workflow, toplevel=toplevel) dataopts = {'initdir': workflow_workspace} - try: with steering_ctx(dataarg=workflow_workspace, dataopts=dataopts, @@ -72,12 +71,12 @@ def run_yadage_workflow(workflow_uuid, workflow_workspace, **workflow_kwargs) as ys: log.info('running workflow on context: {0}'.format(locals())) - publish_workflow_status(workflow_uuid, 1) + publisher.publish_workflow_status(workflow_uuid, 1) ys.adage_argument(additional_trackers=[ REANATracker(identifier=workflow_uuid)]) - publish_workflow_status(workflow_uuid, 2) + publisher.publish_workflow_status(workflow_uuid, 2) log.info('Workflow {workflow_uuid} finished. Files available ' 'at {workflow_workspace}.'.format( @@ -85,4 +84,6 @@ def run_yadage_workflow(workflow_uuid, workflow_workspace, workflow_workspace=workflow_workspace)) except Exception as e: log.info('workflow failed: {0}'.format(e)) - publish_workflow_status(workflow_uuid, 3) + publisher.publish_workflow_status(workflow_uuid, 3) + finally: + publisher.close() diff --git a/reana_workflow_engine_yadage/tracker.py b/reana_workflow_engine_yadage/tracker.py index 72bdf75..ee251c2 100644 --- a/reana_workflow_engine_yadage/tracker.py +++ b/reana_workflow_engine_yadage/tracker.py @@ -31,11 +31,7 @@ import networkx as nx from yadage.utils import WithJsonRefEncoder -from .utils import publish_workflow_status - -# def publish_workflow_status(*args, **kwargs): -# pass - +from .utils import publisher log = logging.getLogger(__name__) @@ -129,8 +125,9 @@ def track(self, adageobj): {} '''.format(self.workflow_id, json.dumps(progress, indent=4), log_message)) - publish_workflow_status(self.workflow_id, status=1, message={ - "progress": progress}, logs=log_message) + publisher.publish_workflow_status( + self.workflow_id, status=1, logs=log_message, + message={"progress": progress}) def finalize(self, adageobj): self.track(adageobj) diff --git a/reana_workflow_engine_yadage/utils.py b/reana_workflow_engine_yadage/utils.py index 8fa648b..de8cfa7 100644 --- a/reana_workflow_engine_yadage/utils.py +++ b/reana_workflow_engine_yadage/utils.py @@ -22,35 +22,7 @@ import json -import pika +from reana_commons.publisher import Publisher -from .config import BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER - - -def publish_workflow_status(workflow_uuid, status, message=None, logs=None): - """Update database workflow status. - - :param workflow_uuid: UUID which represents the workflow. - :param status: String that represents the analysis status. - :param status_message: String that represents the message related with the - status, if there is any. - """ - - broker_credentials = pika.PlainCredentials(BROKER_USER, - BROKER_PASS) - connection = pika.BlockingConnection( - pika.ConnectionParameters(BROKER_URL, - BROKER_PORT, - '/', - broker_credentials)) - channel = connection.channel() - channel.queue_declare(queue='jobs-status') - channel.basic_publish(exchange='', - routing_key='jobs-status', - body=json.dumps({"workflow_uuid": workflow_uuid, - "status": status, - "logs": logs, - "message": message}), - properties=pika.BasicProperties( - delivery_mode=2, # msg persistent - )) +publisher = Publisher() +publisher.connect() diff --git a/requirements-dev.txt b/requirements-dev.txt index d4e6008..7641d6e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ wdb ipdb Flask-DebugToolbar +git+git://github.com/reanahub/reana-commons.git#egg=reana-commons diff --git a/setup.py b/setup.py index 8aaf022..5b0642d 100644 --- a/setup.py +++ b/setup.py @@ -69,6 +69,7 @@ 'pyzmq==16.0.2', 'requests==2.11.1', 'yadage-schemas==0.7.16', + 'reana-commons>=0.3.0.dev20180418', 'adage==0.8.5', 'packtivity==0.10.0', 'yadage==0.13.5',