Skip to content

Commit

Permalink
Merge d9dd899 into 571ef08
Browse files Browse the repository at this point in the history
  • Loading branch information
dprelipcean committed Aug 26, 2019
2 parents 571ef08 + d9dd899 commit 019a145
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 74 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,5 +1,5 @@
# This file is part of REANA.
# Copyright (C) 2017, 2018 CERN.
# Copyright (C) 2017, 2018, 2019 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -15,7 +15,7 @@ cache:
- pip

python:
- "2.7"
- "3.6"

services:
- docker
Expand Down
1 change: 1 addition & 0 deletions AUTHORS.rst
Expand Up @@ -3,6 +3,7 @@ Authors

The list of contributors in alphabetical order:

- `Daniel Prelipcean <https://orcid.org/0000-0002-4855-194X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -4,7 +4,7 @@
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

FROM python:2.7-slim
FROM python:3.6-slim

ENV TERM=xterm
RUN apt-get update && \
Expand Down
11 changes: 8 additions & 3 deletions reana_workflow_engine_yadage/cli.py
Expand Up @@ -73,9 +73,12 @@ def run_yadage_workflow(workflow_uuid,
workflow_kwargs = dict(workflow=workflow, toplevel=toplevel)

dataopts = {'initdir': workflow_workspace}

try:

check_connection_to_job_controller()
publisher = REANAWorkflowStatusPublisher()

with steering_ctx(dataarg=workflow_workspace,
dataopts=dataopts,
initdata=workflow_parameters if workflow_parameters
Expand All @@ -87,12 +90,12 @@ def run_yadage_workflow(workflow_uuid,
**workflow_kwargs) as ys:

log.info('running workflow on context: {0}'.format(locals()))
publisher.publish_workflow_status(workflow_uuid, 1)

ys.adage_argument(additional_trackers=[
REANATracker(identifier=workflow_uuid)])

publisher.publish_workflow_status(workflow_uuid, 2)
publisher.publish_workflow_status(workflow_uuid, 2,
logs='debugging 2')

log.info('Workflow {workflow_uuid} finished. Files available '
'at {workflow_workspace}.'.format(
Expand All @@ -101,7 +104,9 @@ def run_yadage_workflow(workflow_uuid,
except Exception as e:
log.info('workflow failed: {0}'.format(e))
if publisher:
publisher.publish_workflow_status(workflow_uuid, 3)
publisher.publish_workflow_status(
workflow_uuid, 3, logs='workflow failed: {0}'.format(e)
)
else:
log.error('Workflow {workflow_uuid} failed but status '
'could not be published.'.format(
Expand Down
119 changes: 65 additions & 54 deletions reana_workflow_engine_yadage/externalbackend.py
Expand Up @@ -8,63 +8,57 @@
"""REANA-Workflow-Engine-yadage REANA packtivity backend."""

import ast
import base64
import logging
import os
import pipes

from packtivity.asyncbackends import PacktivityProxyBase
from packtivity.syncbackends import (build_job, contextualize_parameters,
packconfig, publish)
from reana_commons.api_client import JobControllerAPIClient as rjc_api_client
from packtivity.syncbackends import (build_job, finalize_inputs, packconfig,
publish)
from reana_commons.api_client import JobControllerAPIClient as RJC_API_Client

from .config import LOGGING_MODULE, MOUNT_CVMFS
from .utils import REANAWorkflowStatusPublisher

log = logging.getLogger(LOGGING_MODULE)


def make_oneliner(job):
"""Convert a command into oneliner."""
wrapped_cmd = 'sh -c {} '.format(
pipes.quote(job['command'])
)
return wrapped_cmd
def get_commands(job):
"""Convert a command/script into oneliner from its job."""
_prettified_cmd, _wrapped_cmd = None, None

if 'command' in job:
_prettified_cmd = job['command']
_wrapped_cmd = 'sh -c {} '.format(pipes.quote(job['command']))

def make_script(job):
"""Encode script type commands in base64."""
encoded_script = base64.b64encode(job['script'])
cmd = 'echo {encoded}|base64 -d|{interpreter}'.format(
encoded=encoded_script,
interpreter=job['interpreter']
)
wrapped_cmd = 'sh -c {} '.format(
pipes.quote(cmd)
)
return wrapped_cmd
elif 'script' in job:
_prettified_cmd = job['script']
_wrapped_cmd = 'sh -c {} '.format(pipes.quote(job['script']))

return _prettified_cmd, _wrapped_cmd


class ExternalProxy(PacktivityProxyBase):
"""REANA yadage external proxy."""

def __init__(self, job_id, spec, pars, state):
"""Initialize yadage external proxy."""
super().__init__()
self.job_id = job_id
self.spec = spec
self.pars = pars
self.state = state

def proxyname(self):
"""Return the proxy name."""
return 'ExternalProxy'
return 'ReanaExternalProxy'

def details(self):
"""Retrieve the proxy details."""
return {
'job_id': self.job_id,
'spec': self.spec,
'pars': self.pars,
'pars': self.pars.json(),
'state': self.state.json(),
}

Expand All @@ -85,24 +79,35 @@ class ExternalBackend(object):
def __init__(self):
"""Initialize the REANA packtivity backend."""
self.config = packconfig()
self.rjc_api_client = rjc_api_client('reana-job-controller')
self.rjc_api_client = RJC_API_Client('reana-job-controller')

def prepublish(self, spec, parameters, context):
"""."""
return None
self._fail_info = None

def submit(self, spec, parameters, state, metadata):
"""Submit a yadage packtivity to RJC."""
parameters = contextualize_parameters(parameters,
state)
job = build_job(spec['process'], parameters, state, self.config)

if 'command' in job:
prettified_cmd = job['command']
wrapped_cmd = make_oneliner(job)
elif 'script' in job:
prettified_cmd = job['script']
wrapped_cmd = make_script(job)
if type(parameters) == tuple:
try:
parameters, state = finalize_inputs(parameters, state)
job = build_job(
spec['process'], parameters, state, self.config
)
except AttributeError:
# Taken from yadage/packtivitybackend.py:
# "this is a little hacky, because the packtivity backends
# take unrolled spec/parameters/context while the packtivity
# API takes generalized task objects
log.info("Parameters are: {}".format(parameters))
parameters, state = finalize_inputs(parameters.parameters,
parameters.state)
job = build_job(
spec['process'], parameters, state, self.config
)

else:
parameters, state = finalize_inputs(parameters, state)
job = build_job(spec['process'], parameters, state, self.config)

prettified_cmd, wrapped_cmd = get_commands(job)

image = spec['environment']['image']
# tag = spec['environment']['imagetag']
Expand All @@ -129,7 +134,7 @@ def submit(self, spec, parameters, state, metadata):
job_id = self.rjc_api_client.submit(*job_request_body)

log.info('submitted job: %s', job_id)
message = {"job_id": str(job_id).decode('utf-8')}
message = {"job_id": str(job_id)}
workflow_uuid = os.getenv('workflow_uuid', 'default')
status_running = 1
try:
Expand All @@ -154,28 +159,34 @@ def submit(self, spec, parameters, state, metadata):
)

def result(self, resultproxy):
"""Retrieve the result of a pactivity run by RJC."""
resultproxy.pars = contextualize_parameters(resultproxy.pars,
resultproxy.state)
"""Retrieve the result of a packtivity run by RJC."""
resultproxy.pars, resultproxy.state \
= finalize_inputs(resultproxy.pars, resultproxy.state)

self._fail_info = "Debug params are:\n rp:{} \n rpp:{}\n rps:{}"\
.format(resultproxy.details(), resultproxy.pars, resultproxy.state)

return publish(
resultproxy.spec['publisher'],
resultproxy.pars, resultproxy.state, self.config
)
resultproxy.spec['publisher'],
resultproxy.pars, resultproxy.state, self.config
)

def ready(self, resultproxy):
"""Check if a packtivity is finished."""
def _get_state(self, resultproxy):
"""Get the packtivity state."""
resultproxy = ast.literal_eval(resultproxy.job_id)
status_res = self.rjc_api_client.check_status(
resultproxy['job_id'])
return status_res['status'] != 'started'
return status_res['status']

def ready(self, resultproxy):
"""Check if a packtivity is finished."""
return self._get_state(resultproxy) != 'started'

def successful(self, resultproxy):
"""Check if the pactivity was successful."""
resultproxy = ast.literal_eval(resultproxy.job_id)
status_res = self.rjc_api_client.check_status(
resultproxy['job_id'])
return status_res['status'] == 'succeeded'
"""Check if the packtivity was successful."""
return self._get_state(resultproxy) == 'succeeded'

def fail_info(self, resultproxy):
"""Retreive the fail info."""
pass
"""Retrieve the fail info."""
self._fail_info += "\nraw info: {}".format(resultproxy)
return self._fail_info
16 changes: 10 additions & 6 deletions reana_workflow_engine_yadage/tracker.py
Expand Up @@ -83,6 +83,7 @@ def track(self, adageobj):

progress = {
"engine_specific": None,
"planned": {"total": 0, "job_ids": []},
"failed": {"total": 0, "job_ids": []},
"total": {"total": 0, "job_ids": []},
"running": {"total": 0, "job_ids": []},
Expand All @@ -93,7 +94,9 @@ def track(self, adageobj):
[.dag.nodes[]|{metadata: {name: .task.metadata.name}, id: .id, \
jobid: .proxy.proxydetails.job_id}]}}').transform(purejson)

for node in analyze_progress(adageobj):
nodes = analyze_progress(adageobj)

for node in nodes:
key = {
'running': 'running',
'succeeded': 'finished',
Expand All @@ -108,9 +111,8 @@ def track(self, adageobj):
if key in ['running', 'finished', 'failed']:
progress[key]['job_ids'].append(job_id)

log_message = 'this is a tracking log at {}'.format(
datetime.datetime.now().isoformat()
)
log_message = 'this is a tracking log at {}\n'\
.format(datetime.datetime.now().isoformat())

log.info('''sending to REANA
uuid: {}
Expand All @@ -119,7 +121,8 @@ def track(self, adageobj):
message:
{}
'''.format(self.workflow_id,
json.dumps(progress, indent=4), log_message))
json.dumps(progress, indent=4),
log_message))
message = {"progress": progress}
status_running = 1
try:
Expand All @@ -138,7 +141,8 @@ def track(self, adageobj):
log.info('workflow status publish failed: {0}'.format(e))

def finalize(self, adageobj):
"""Finilizes the progress tracking."""
"""Finalize the progress tracking."""
self.track(adageobj)
log.info("Finalizing the progress tracking for: {}".format(adageobj))
if self.reana_status_publisher:
self.reana_status_publisher.close()
2 changes: 1 addition & 1 deletion reana_workflow_engine_yadage/version.py
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = "0.5.0"
__version__ = "0.6.0dev20190815"
22 changes: 15 additions & 7 deletions setup.py
Expand Up @@ -36,25 +36,31 @@
continue
extras_require['all'].extend(reqs)


setup_requires = [
'pytest-runner>=2.7',
]

install_requires = [
'adage==0.8.5',
'click>=7',
'enum34>=1.1.6',
'packtivity==0.10.0',
'graphviz>=0.12', # FIXME remove once yadage-schemas solves yadage deps.
'jq==0.1.6',
'networkx==1.11',
'packtivity==0.14.21',
'pydot2>=1.0.33', # FIXME remove once yadage-schemas solves yadage deps.
'pydotplus>=2.0.2', # FIXME remove once yadage-schemas solves yadage deps.
'pygraphviz>=1.5', # FIXME remove once yadage-schemas solves yadage deps.
'pyOpenSSL==17.5.0', # FIXME remove once yadage-schemas solves deps.
'reana-commons>=0.6.0.dev20190812,<0.7.0',
'requests==2.20.0',
'rfc3987==1.3.7', # FIXME remove once yadage-schemas solves deps.
'requests==2.22.0',
'rfc3987==1.3.7', # FIXME remove once yadage-schemas solves yadage deps.
'strict-rfc3339==0.7', # FIXME remove once yadage-schemas solves deps.
'SQLAlchemy-Utils>=0.32.18',
'SQLAlchemy>=1.1.14',
'yadage-schemas==0.7.16',
'yadage==0.13.5',
'webcolors==1.7', # FIXME remove once yadage-schemas solves deps.
'yadage==0.20.0',
'yadage-schemas==0.10.6',
'webcolors==1.7', # FIXME remove once yadage-schemas solves yadage deps.
]

packages = find_packages()
Expand Down Expand Up @@ -96,6 +102,8 @@
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
Expand Down

0 comments on commit 019a145

Please sign in to comment.