Skip to content

Commit

Permalink
Merge 3aa4ab4 into 2c9978b
Browse files Browse the repository at this point in the history
  • Loading branch information
dprelipcean committed Sep 10, 2019
2 parents 2c9978b + 3aa4ab4 commit f1de486
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 55 deletions.
64 changes: 19 additions & 45 deletions reana_workflow_engine_yadage/externalbackend.py
Expand Up @@ -7,12 +7,11 @@
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA-Workflow-Engine-yadage REANA packtivity backend."""

import ast
import logging
import os
import pipes

from packtivity.asyncbackends import PacktivityProxyBase
from packtivity.asyncbackends import ExternalAsyncProxy
from packtivity.syncbackends import (build_job, finalize_inputs, packconfig,
publish)
from reana_commons.api_client import JobControllerAPIClient as RJC_API_Client
Expand All @@ -38,40 +37,19 @@ def get_commands(job):
return _prettified_cmd, _wrapped_cmd


class ExternalProxy(PacktivityProxyBase):
class ReanaExternalProxy(ExternalAsyncProxy):
"""REANA yadage external proxy."""

def __init__(self, job_id, spec, pars, state):
"""Initialize yadage external proxy."""
super().__init__()
self.job_id = job_id
self.spec = spec
self.pars = pars
self.state = state

def proxyname(self):
"""Return the proxy name."""
return 'ReanaExternalProxy'

def details(self):
"""Retrieve the proxy details."""
"""Parse details to json format."""
return {
'job_id': self.job_id,
'spec': self.spec,
'pars': self.pars.json(),
'state': self.state.json(),
"resultdata": self.resultdata,
"jobproxy": self.jobproxy,
"spec": self.spec,
"statedata": self.statedata.json(),
"pardata": self.pardata.json(),
}

@classmethod
def fromJSON(cls, data):
"""Retrieve proxy details from JSON."""
return cls(
data['proxydetails']['job_id'],
data['proxydetails']['spec'],
data['proxydetails']['pars'],
data['proxydetails']['state']
)


class ExternalBackend(object):
"""REANA yadage external packtivity backend class."""
Expand Down Expand Up @@ -103,8 +81,8 @@ def submit(self, spec, parameters, state, metadata):
if 'compute_backend' in item.keys():
compute_backend = item['compute_backend']

log.info('state context is %s', state)
log.info('would run job %s', job)
log.info('state context is {0}'.format(state))
log.info('would run job {0}'.format(job))

state.ensure()

Expand All @@ -130,7 +108,7 @@ def submit(self, spec, parameters, state, metadata):

job_id = self.rjc_api_client.submit(**job_request_body)

log.info('submitted job: %s', job_id)
log.info('submitted job:{0}'.format(job_id))
message = {"job_id": str(job_id)}
workflow_uuid = os.getenv('workflow_uuid', 'default')
status_running = 1
Expand All @@ -148,31 +126,27 @@ def submit(self, spec, parameters, state, metadata):
))
log.info('workflow status publish failed: {0}'.format(e))

return ExternalProxy(
job_id=str(job_id),
return ReanaExternalProxy(
jobproxy=job_id,
spec=spec,
pars=parameters,
state=state
pardata=parameters,
statedata=state
)

def result(self, resultproxy):
"""Retrieve the result of a packtivity run by RJC."""
resultproxy.pars, resultproxy.state \
= finalize_inputs(resultproxy.pars, resultproxy.state)

self._fail_info = "Debug params are:\n rp:{} \n rpp:{}\n rps:{}"\
.format(resultproxy.details(), resultproxy.pars, resultproxy.state)
resultproxy.pardata, resultproxy.statedata \
= finalize_inputs(resultproxy.pardata, resultproxy.statedata)

return publish(
resultproxy.spec['publisher'],
resultproxy.pars, resultproxy.state, self.config
resultproxy.pardata, resultproxy.statedata, self.config
)

def _get_state(self, resultproxy):
"""Get the packtivity state."""
resultproxy = ast.literal_eval(resultproxy.job_id)
status_res = self.rjc_api_client.check_status(
resultproxy['job_id'])
resultproxy.jobproxy['job_id'])
return status_res['status']

def ready(self, resultproxy):
Expand Down
22 changes: 12 additions & 10 deletions reana_workflow_engine_yadage/tracker.py
Expand Up @@ -38,15 +38,18 @@ def analyze_progress(adageobj):
continue
if nodeobj.state == nodestate.RUNNING:
nodestates.append(
{'state': 'running', 'job_id': nodeobj.resultproxy.job_id}
{'state': 'running',
'job_id': nodeobj.resultproxy.jobproxy['job_id']}
)
elif dagstate.node_status(nodeobj):
nodestates.append(
{'state': 'succeeded', 'job_id': nodeobj.resultproxy.job_id}
{'state': 'succeeded',
'job_id': nodeobj.resultproxy.jobproxy['job_id']}
)
elif dagstate.node_ran_and_failed(nodeobj):
nodestates.append(
{'state': 'failed', 'job_id': nodeobj.resultproxy.job_id}
{'state': 'failed',
'job_id': nodeobj.resultproxy.jobproxy['job_id']}
)
elif dagstate.upstream_failure(dag, nodeobj):
nodestates.append(
Expand Down Expand Up @@ -92,7 +95,7 @@ def track(self, adageobj):

progress['engine_specific'] = jq.jq('{dag: {edges: .dag.edges, nodes: \
[.dag.nodes[]|{metadata: {name: .task.metadata.name}, id: .id, \
jobid: .proxy.proxydetails.job_id}]}}').transform(purejson)
jobid: .proxy.proxydetails.jobproxy}]}}').transform(purejson)

for node in analyze_progress(adageobj):
key = {
Expand All @@ -103,13 +106,12 @@ def track(self, adageobj):
'scheduled': 'total',
}[node['state']]
progress[key]['total'] += 1
if isinstance(node['job_id'], str):
job_id_dict = ast.literal_eval(node['job_id'])
job_id = job_id_dict['job_id']
if key in ['running', 'finished', 'failed']:
progress[key]['job_ids'].append(job_id)

log_message = 'this is a tracking log at {}\n'\
job_id = node['job_id']
if key in ['running', 'finished', 'failed']:
progress[key]['job_ids'].append(job_id)

log_message = 'this is a tracking log at {0}'\
.format(datetime.datetime.now().isoformat())

log.info('''sending to REANA
Expand Down

0 comments on commit f1de486

Please sign in to comment.