From 3aa4ab4f1833fa9e539fbbae99cba0bfe77d14c9 Mon Sep 17 00:00:00 2001 From: Daniel Prelipcean Date: Fri, 6 Sep 2019 17:13:09 +0200 Subject: [PATCH] externalproxy: refactor * Use default yadage external proxy * Closes #125 Signed-off-by: Daniel Prelipcean --- .../externalbackend.py | 64 ++++++------------- reana_workflow_engine_yadage/tracker.py | 22 ++++--- 2 files changed, 31 insertions(+), 55 deletions(-) diff --git a/reana_workflow_engine_yadage/externalbackend.py b/reana_workflow_engine_yadage/externalbackend.py index 0ef6388..490b3b0 100644 --- a/reana_workflow_engine_yadage/externalbackend.py +++ b/reana_workflow_engine_yadage/externalbackend.py @@ -7,12 +7,11 @@ # under the terms of the MIT License; see LICENSE file for more details. """REANA-Workflow-Engine-yadage REANA packtivity backend.""" -import ast import logging import os import pipes -from packtivity.asyncbackends import PacktivityProxyBase +from packtivity.asyncbackends import ExternalAsyncProxy from packtivity.syncbackends import (build_job, finalize_inputs, packconfig, publish) from reana_commons.api_client import JobControllerAPIClient as RJC_API_Client @@ -38,40 +37,19 @@ def get_commands(job): return _prettified_cmd, _wrapped_cmd -class ExternalProxy(PacktivityProxyBase): +class ReanaExternalProxy(ExternalAsyncProxy): """REANA yadage external proxy.""" - def __init__(self, job_id, spec, pars, state): - """Initialize yadage external proxy.""" - super().__init__() - self.job_id = job_id - self.spec = spec - self.pars = pars - self.state = state - - def proxyname(self): - """Return the proxy name.""" - return 'ReanaExternalProxy' - def details(self): - """Retrieve the proxy details.""" + """Parse details to json format.""" return { - 'job_id': self.job_id, - 'spec': self.spec, - 'pars': self.pars.json(), - 'state': self.state.json(), + "resultdata": self.resultdata, + "jobproxy": self.jobproxy, + "spec": self.spec, + "statedata": self.statedata.json(), + "pardata": self.pardata.json(), } - @classmethod - def fromJSON(cls, data): - """Retrieve proxy details from JSON.""" - return cls( - data['proxydetails']['job_id'], - data['proxydetails']['spec'], - data['proxydetails']['pars'], - data['proxydetails']['state'] - ) - class ExternalBackend(object): """REANA yadage external packtivity backend class.""" @@ -103,8 +81,8 @@ def submit(self, spec, parameters, state, metadata): if 'compute_backend' in item.keys(): compute_backend = item['compute_backend'] - log.info('state context is %s', state) - log.info('would run job %s', job) + log.info('state context is {0}'.format(state)) + log.info('would run job {0}'.format(job)) state.ensure() @@ -130,7 +108,7 @@ def submit(self, spec, parameters, state, metadata): job_id = self.rjc_api_client.submit(**job_request_body) - log.info('submitted job: %s', job_id) + log.info('submitted job:{0}'.format(job_id)) message = {"job_id": str(job_id)} workflow_uuid = os.getenv('workflow_uuid', 'default') status_running = 1 @@ -148,31 +126,27 @@ def submit(self, spec, parameters, state, metadata): )) log.info('workflow status publish failed: {0}'.format(e)) - return ExternalProxy( - job_id=str(job_id), + return ReanaExternalProxy( + jobproxy=job_id, spec=spec, - pars=parameters, - state=state + pardata=parameters, + statedata=state ) def result(self, resultproxy): """Retrieve the result of a packtivity run by RJC.""" - resultproxy.pars, resultproxy.state \ - = finalize_inputs(resultproxy.pars, resultproxy.state) - - self._fail_info = "Debug params are:\n rp:{} \n rpp:{}\n rps:{}"\ - .format(resultproxy.details(), resultproxy.pars, resultproxy.state) + resultproxy.pardata, resultproxy.statedata \ + = finalize_inputs(resultproxy.pardata, resultproxy.statedata) return publish( resultproxy.spec['publisher'], - resultproxy.pars, resultproxy.state, self.config + resultproxy.pardata, resultproxy.statedata, self.config ) def _get_state(self, resultproxy): """Get the packtivity state.""" - resultproxy = ast.literal_eval(resultproxy.job_id) status_res = self.rjc_api_client.check_status( - resultproxy['job_id']) + resultproxy.jobproxy['job_id']) return status_res['status'] def ready(self, resultproxy): diff --git a/reana_workflow_engine_yadage/tracker.py b/reana_workflow_engine_yadage/tracker.py index 2a3d1cf..0ac5b1e 100644 --- a/reana_workflow_engine_yadage/tracker.py +++ b/reana_workflow_engine_yadage/tracker.py @@ -38,15 +38,18 @@ def analyze_progress(adageobj): continue if nodeobj.state == nodestate.RUNNING: nodestates.append( - {'state': 'running', 'job_id': nodeobj.resultproxy.job_id} + {'state': 'running', + 'job_id': nodeobj.resultproxy.jobproxy['job_id']} ) elif dagstate.node_status(nodeobj): nodestates.append( - {'state': 'succeeded', 'job_id': nodeobj.resultproxy.job_id} + {'state': 'succeeded', + 'job_id': nodeobj.resultproxy.jobproxy['job_id']} ) elif dagstate.node_ran_and_failed(nodeobj): nodestates.append( - {'state': 'failed', 'job_id': nodeobj.resultproxy.job_id} + {'state': 'failed', + 'job_id': nodeobj.resultproxy.jobproxy['job_id']} ) elif dagstate.upstream_failure(dag, nodeobj): nodestates.append( @@ -92,7 +95,7 @@ def track(self, adageobj): progress['engine_specific'] = jq.jq('{dag: {edges: .dag.edges, nodes: \ [.dag.nodes[]|{metadata: {name: .task.metadata.name}, id: .id, \ - jobid: .proxy.proxydetails.job_id}]}}').transform(purejson) + jobid: .proxy.proxydetails.jobproxy}]}}').transform(purejson) for node in analyze_progress(adageobj): key = { @@ -103,13 +106,12 @@ def track(self, adageobj): 'scheduled': 'total', }[node['state']] progress[key]['total'] += 1 - if isinstance(node['job_id'], str): - job_id_dict = ast.literal_eval(node['job_id']) - job_id = job_id_dict['job_id'] - if key in ['running', 'finished', 'failed']: - progress[key]['job_ids'].append(job_id) - log_message = 'this is a tracking log at {}\n'\ + job_id = node['job_id'] + if key in ['running', 'finished', 'failed']: + progress[key]['job_ids'].append(job_id) + + log_message = 'this is a tracking log at {0}'\ .format(datetime.datetime.now().isoformat()) log.info('''sending to REANA