Skip to content

Commit

Permalink
externalbackend: upgraded yadage
Browse files Browse the repository at this point in the history
* upgraded yadage to newest versions and removed old dependencies

* adjusted reana backend to new yadage API

* upgraded to python 3.6

Signed-off-by: Daniel Prelipcean <daniel.prelipcean@cern.ch>
  • Loading branch information
dprelipcean committed Aug 16, 2019
1 parent 571ef08 commit 2edf862
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 70 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,5 +1,5 @@
# This file is part of REANA.
# Copyright (C) 2017, 2018 CERN.
# Copyright (C) 2017, 2018, 2019 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -15,7 +15,7 @@ cache:
- pip

python:
- "2.7"
- "3.6"

services:
- docker
Expand Down
1 change: 1 addition & 0 deletions AUTHORS.rst
Expand Up @@ -3,6 +3,7 @@ Authors

The list of contributors in alphabetical order:

- `Daniel Prelipcean <https://orcid.org/0000-0002-4855-194X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -4,7 +4,7 @@
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

FROM python:2.7-slim
FROM python:3.6-slim

ENV TERM=xterm
RUN apt-get update && \
Expand Down
117 changes: 65 additions & 52 deletions reana_workflow_engine_yadage/externalbackend.py
Expand Up @@ -8,48 +8,42 @@
"""REANA-Workflow-Engine-yadage REANA packtivity backend."""

import ast
import base64
import logging
import os
import pipes

from packtivity.asyncbackends import PacktivityProxyBase
from packtivity.syncbackends import (build_job, contextualize_parameters,
packconfig, publish)
from reana_commons.api_client import JobControllerAPIClient as rjc_api_client
from packtivity.syncbackends import (build_job, finalize_inputs, packconfig,
publish)
from reana_commons.api_client import JobControllerAPIClient as RJC_API_Client

from .config import LOGGING_MODULE, MOUNT_CVMFS
from .utils import REANAWorkflowStatusPublisher

log = logging.getLogger(LOGGING_MODULE)


def make_oneliner(job):
"""Convert a command into oneliner."""
wrapped_cmd = 'sh -c {} '.format(
pipes.quote(job['command'])
)
return wrapped_cmd
def get_commands(job):
"""Convert a command/script into oneliner from its job."""
_prettified_cmd, _wrapped_cmd = None, None

if 'command' in job:
_prettified_cmd = job['command']
_wrapped_cmd = 'sh -c {} '.format(pipes.quote(job['command']))

def make_script(job):
"""Encode script type commands in base64."""
encoded_script = base64.b64encode(job['script'])
cmd = 'echo {encoded}|base64 -d|{interpreter}'.format(
encoded=encoded_script,
interpreter=job['interpreter']
)
wrapped_cmd = 'sh -c {} '.format(
pipes.quote(cmd)
)
return wrapped_cmd
elif 'script' in job:
_prettified_cmd = job['script']
_wrapped_cmd = 'sh -c {} '.format(pipes.quote(job['script']))

return _prettified_cmd, _wrapped_cmd


class ExternalProxy(PacktivityProxyBase):
"""REANA yadage external proxy."""

def __init__(self, job_id, spec, pars, state):
"""Initialize yadage external proxy."""
super().__init__()
self.job_id = job_id
self.spec = spec
self.pars = pars
Expand Down Expand Up @@ -85,24 +79,37 @@ class ExternalBackend(object):
def __init__(self):
"""Initialize the REANA packtivity backend."""
self.config = packconfig()
self.rjc_api_client = rjc_api_client('reana-job-controller')
self.rjc_api_client = RJC_API_Client('reana-job-controller')

def prepublish(self, spec, parameters, context):
"""."""
return None
self._fail_info = None

def submit(self, spec, parameters, state, metadata):
"""Submit a yadage packtivity to RJC."""
parameters = contextualize_parameters(parameters,
state)
job = build_job(spec['process'], parameters, state, self.config)

if 'command' in job:
prettified_cmd = job['command']
wrapped_cmd = make_oneliner(job)
elif 'script' in job:
prettified_cmd = job['script']
wrapped_cmd = make_script(job)
if type(parameters) == tuple:
try:
parameters, state = finalize_inputs(parameters, state)
job = build_job(
spec['process'], parameters, state, self.config
)
except AttributeError:
# Taken from yadage packtivitybackend.py:
# "this is a little hacky, because the packtivity backends
# take unrolled spec/parameters/context while the adage API
# takes generalized task objects
# possibly could use Munch on the packtivity side to
# dynammicaly create .task/.parameters/.state-able objects"
log.info("Parameters are: {}".format(parameters))
parameters, state = finalize_inputs(parameters.parameters,
parameters.state)
job = build_job(
spec['process'], parameters, state, self.config
)

else:
parameters, state = finalize_inputs(parameters, state)
job = build_job(spec['process'], parameters, state, self.config)

prettified_cmd, wrapped_cmd = get_commands(job)

image = spec['environment']['image']
# tag = spec['environment']['imagetag']
Expand All @@ -129,7 +136,7 @@ def submit(self, spec, parameters, state, metadata):
job_id = self.rjc_api_client.submit(*job_request_body)

log.info('submitted job: %s', job_id)
message = {"job_id": str(job_id).decode('utf-8')}
message = {"job_id": str(job_id)}
workflow_uuid = os.getenv('workflow_uuid', 'default')
status_running = 1
try:
Expand All @@ -154,28 +161,34 @@ def submit(self, spec, parameters, state, metadata):
)

def result(self, resultproxy):
"""Retrieve the result of a pactivity run by RJC."""
resultproxy.pars = contextualize_parameters(resultproxy.pars,
resultproxy.state)
"""Retrieve the result of a packtivity run by RJC."""
resultproxy.pars, resultproxy.state \
= finalize_inputs(resultproxy.pars, resultproxy.state)

self._fail_info = "Debug params are:\n rp:{} \n rpp:{}\n rps:{}"\
.format(resultproxy.details(), resultproxy.pars, resultproxy.state)

return publish(
resultproxy.spec['publisher'],
resultproxy.pars, resultproxy.state, self.config
)
resultproxy.spec['publisher'],
resultproxy.pars, resultproxy.state, self.config
)

def ready(self, resultproxy):
"""Check if a packtivity is finished."""
def _get_state(self, resultproxy):
"""Get the packtivity state."""
resultproxy = ast.literal_eval(resultproxy.job_id)
status_res = self.rjc_api_client.check_status(
resultproxy['job_id'])
return status_res['status'] != 'started'
return status_res['status']

def ready(self, resultproxy):
"""Check if a packtivity is finished."""
return self._get_state(resultproxy) != 'started'

def successful(self, resultproxy):
"""Check if the pactivity was successful."""
resultproxy = ast.literal_eval(resultproxy.job_id)
status_res = self.rjc_api_client.check_status(
resultproxy['job_id'])
return status_res['status'] == 'succeeded'
"""Check if the packtivity was successful."""
return self._get_state(resultproxy) == 'succeeded'

def fail_info(self, resultproxy):
"""Retreive the fail info."""
pass
"""Retrieve the fail info."""
self._fail_info += "\nraw info: {}".format(resultproxy)
return self._fail_info
11 changes: 6 additions & 5 deletions reana_workflow_engine_yadage/tracker.py
Expand Up @@ -83,6 +83,7 @@ def track(self, adageobj):

progress = {
"engine_specific": None,
"planned": {"total": 0, "job_ids": []},
"failed": {"total": 0, "job_ids": []},
"total": {"total": 0, "job_ids": []},
"running": {"total": 0, "job_ids": []},
Expand All @@ -108,9 +109,8 @@ def track(self, adageobj):
if key in ['running', 'finished', 'failed']:
progress[key]['job_ids'].append(job_id)

log_message = 'this is a tracking log at {}'.format(
datetime.datetime.now().isoformat()
)
log_message = 'this is a tracking log at {} '\
.format(datetime.datetime.now().isoformat())

log.info('''sending to REANA
uuid: {}
Expand All @@ -119,7 +119,8 @@ def track(self, adageobj):
message:
{}
'''.format(self.workflow_id,
json.dumps(progress, indent=4), log_message))
json.dumps(progress, indent=4),
log_message))
message = {"progress": progress}
status_running = 1
try:
Expand All @@ -138,7 +139,7 @@ def track(self, adageobj):
log.info('workflow status publish failed: {0}'.format(e))

def finalize(self, adageobj):
"""Finilizes the progress tracking."""
"""Finalize the progress tracking."""
self.track(adageobj)
if self.reana_status_publisher:
self.reana_status_publisher.close()
2 changes: 1 addition & 1 deletion reana_workflow_engine_yadage/version.py
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = "0.5.0"
__version__ = "0.6.0dev20190815"
18 changes: 9 additions & 9 deletions setup.py
Expand Up @@ -41,20 +41,18 @@
]

install_requires = [
'adage==0.8.5',
'adage==0.10.0',
'click>=7',
'enum34>=1.1.6',
'packtivity==0.10.0',
'pyOpenSSL==17.5.0', # FIXME remove once yadage-schemas solves deps.
'jq==0.1.6',
'networkx==1.11',
'packtivity==0.14.21',
'reana-commons>=0.6.0.dev20190812,<0.7.0',
'requests==2.20.0',
'rfc3987==1.3.7', # FIXME remove once yadage-schemas solves deps.
'strict-rfc3339==0.7', # FIXME remove once yadage-schemas solves deps.
'requests==2.22.0',
'SQLAlchemy-Utils>=0.32.18',
'SQLAlchemy>=1.1.14',
'yadage-schemas==0.7.16',
'yadage==0.13.5',
'webcolors==1.7', # FIXME remove once yadage-schemas solves deps.
'yadage==0.20.0',
'yadage-schemas==0.10.6',
]

packages = find_packages()
Expand Down Expand Up @@ -96,6 +94,8 @@
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
Expand Down

0 comments on commit 2edf862

Please sign in to comment.