Skip to content

Commit

Permalink
Merge c27c364 into 7ab7458
Browse files Browse the repository at this point in the history
  • Loading branch information
mvidalgarcia committed Apr 7, 2020
2 parents 7ab7458 + c27c364 commit 9756df0
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 85 deletions.
2 changes: 1 addition & 1 deletion reana_client/cli/files.py
Expand Up @@ -21,7 +21,7 @@
filter_data, parse_parameters)
from reana_client.config import ERROR_MESSAGES, JSON, URL
from reana_client.errors import FileDeletionError, FileUploadError
from reana_client.utils import (get_reana_yaml_file_path, load_reana_spec,
from reana_client.utils import (get_reana_yaml_file_path,
workflow_uuid_or_name)
from reana_commons.utils import click_table_printer

Expand Down
15 changes: 15 additions & 0 deletions reana_client/cli/utils.py
Expand Up @@ -136,6 +136,21 @@ def validate_workflow_name(ctx, _, workflow_name):
return workflow_name


def key_value_to_dict(ctx, param, value):
"""Convert tuple params to dictionary. e.g `(foo=bar)` to `{'foo': 'bar'}`.
:param options: A tuple with CLI operational options.
:returns: A dictionary representation of the given options.
"""
try:
return dict(op.split('=') for op in value)
except ValueError as err:
click.secho('==> ERROR: Input parameter "{0}" is not valid. '
'It must follow format "param=value".'
.format(' '.join(value)), err=True, fg='red'),
sys.exit(1)


class NotRequiredIf(click.Option):
"""Allow only one of two arguments to be missing."""

Expand Down
70 changes: 37 additions & 33 deletions reana_client/cli/workflow.py
Expand Up @@ -20,17 +20,18 @@
from reana_client.cli.utils import (add_access_token_options,
add_workflow_option, check_connection,
filter_data, format_session_uri,
parse_parameters, validate_workflow_name)
key_value_to_dict, parse_parameters,
validate_workflow_name)
from reana_client.config import ERROR_MESSAGES, TIMECHECK
from reana_client.utils import (get_reana_yaml_file_path,
get_workflow_name_and_run_number,
get_workflow_status_change_msg, is_uuid_v4,
load_reana_spec,
validate_cwl_operational_options,
validate_input_parameters,
validate_serial_operational_options,
workflow_uuid_or_name)
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_commons.errors import REANAValidationError
from reana_commons.operational_options import validate_operational_options
from reana_commons.utils import click_table_printer


Expand Down Expand Up @@ -280,13 +281,15 @@ def workflow_create(ctx, file, name,
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off. (workflow engine - serial) '
'E.g. --debug (workflow engine - cwl)',
Expand Down Expand Up @@ -320,29 +323,28 @@ def workflow_start(ctx, workflow, access_token,
for p in ctx.params:
logging.debug('{param}: {value}'.format(param=p, value=ctx.params[p]))

parsed_parameters = {'input_parameters':
dict(p.split('=') for p in parameters)}
parsed_parameters['operational_options'] = ' '.join(options).split()
parsed_parameters = {'input_parameters': parameters,
'operational_options': options}
if workflow:
if parameters or options:
try:
response = get_workflow_parameters(workflow, access_token)
if response['type'] == 'cwl':
validate_cwl_operational_options(
parsed_parameters['operational_options'])
if response['type'] == 'serial':
parsed_parameters['operational_options'] = \
validate_serial_operational_options(
parsed_parameters['operational_options'])
workflow_type = response['type']
original_parameters = response['parameters']
validate_operational_options(
workflow_type,
parsed_parameters['operational_options'])

parsed_parameters['input_parameters'] = \
validate_input_parameters(
parsed_parameters['input_parameters'],
response['parameters'])
original_parameters)
except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
except Exception as e:
click.echo(
click.style('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e))),
err=True)
click.secho('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e)), err=True)
try:
logging.info('Connecting to {0}'.format(get_api_url()))
response = start_workflow(workflow,
Expand Down Expand Up @@ -392,13 +394,15 @@ def workflow_start(ctx, workflow, access_token,
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off. (workflow engine - serial) '
'E.g. --debug (workflow engine - cwl)',
Expand Down Expand Up @@ -439,10 +443,8 @@ def workflow_restart(ctx, workflow, access_token,
for p in ctx.params:
logging.debug('{param}: {value}'.format(param=p, value=ctx.params[p]))

parsed_parameters = {'input_parameters':
dict(p.split('=') for p in parameters)}
parsed_parameters['operational_options'] = ' '.join(options).split()
parsed_parameters['restart'] = True
parsed_parameters = {'input_parameters': parameters,
'operational_options': options, 'restart': True}
if file:
parsed_parameters['reana_specification'] = \
load_reana_spec(click.format_filename(file))
Expand All @@ -460,22 +462,22 @@ def workflow_restart(ctx, workflow, access_token,
response = get_workflow_parameters(workflow, access_token)
workflow_type = response['type']
original_parameters = response['parameters']
if workflow_type == 'cwl':
validate_cwl_operational_options(

parsed_parameters['operational_options'] = \
validate_operational_options(
workflow_type,
parsed_parameters['operational_options'])
if workflow_type == 'serial':
parsed_parameters['operational_options'] = \
validate_serial_operational_options(
parsed_parameters['operational_options'])
parsed_parameters['input_parameters'] = \
validate_input_parameters(
parsed_parameters['input_parameters'],
original_parameters)

except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
except Exception as e:
click.echo(
click.style('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e))),
err=True)
click.secho('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e)), err=True)
try:
logging.info('Connecting to {0}'.format(get_api_url()))
response = start_workflow(workflow,
Expand Down Expand Up @@ -811,14 +813,16 @@ def workflow_stop(ctx, workflow, force_stop, access_token): # noqa: D301
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
help='Additional operatioal options for the workflow execution. '
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off.',
)
@click.option(
Expand Down
6 changes: 6 additions & 0 deletions reana_client/schemas/reana_analysis_schema.json
Expand Up @@ -109,6 +109,12 @@
"title": "Analysis parameters.",
"description": "Key value data structure which represents the analysis parameters.",
"optional": true
},
"options": {
"id": "/properties/workflow/properties/options",
"type": "object",
"title": "Workflow operational options.",
"description": "Extra operational options accepted by workflow engines."
}
}
},
Expand Down
111 changes: 61 additions & 50 deletions reana_client/utils.py
Expand Up @@ -19,6 +19,8 @@
import yadageschemas
import yaml
from jsonschema import ValidationError, validate
from reana_commons.errors import REANAValidationError
from reana_commons.operational_options import validate_operational_options
from reana_commons.serial import serial_load
from reana_commons.utils import get_workflow_status_change_verb

Expand All @@ -38,7 +40,7 @@ def workflow_uuid_or_name(ctx, param, value):
return value


def yadage_load(workflow_file, toplevel='.'):
def yadage_load(workflow_file, toplevel='.', **kwargs):
"""Validate and return yadage workflow specification.
:param workflow_file: A specification file compliant with
Expand All @@ -65,15 +67,15 @@ def yadage_load(workflow_file, toplevel='.'):
}

try:
return yadageschemas.load(spec=workflow_file, specopts=specopts,
validopts=validopts, validate=True)
yadageschemas.load(spec=workflow_file, specopts=specopts,
validopts=validopts, validate=True)

except ValidationError as e:
e.message = str(e)
raise e


def cwl_load(workflow_file):
def cwl_load(workflow_file, **kwargs):
"""Validate and return cwl workflow specification.
:param workflow_file: A specification file compliant with
Expand All @@ -88,14 +90,6 @@ def cwl_load(workflow_file):
return json.loads(value)


workflow_load = {
'yadage': yadage_load,
'cwl': cwl_load,
'serial': serial_load,
}
"""Dictionary to extend with new workflow specification loaders."""


def load_workflow_spec(workflow_type, workflow_file, **kwargs):
"""Validate and return machine readable workflow specifications.
Expand All @@ -104,16 +98,45 @@ def load_workflow_spec(workflow_type, workflow_file, **kwargs):
specification.
:returns: A dictionary which represents the valid workflow specification.
"""
workflow_load = {
'yadage': yadage_load,
'cwl': cwl_load,
'serial': serial_load,
}
"""Dictionary to extend with new workflow specification loaders."""

return workflow_load[workflow_type](workflow_file, **kwargs)


def load_reana_spec(filepath, skip_validation=False):
"""Load and validate reana specification file.
:raises IOError: Error while reading REANA spec file from given filepath`.
:raises IOError: Error while reading REANA spec file from given `filepath`.
:raises ValidationError: Given REANA spec file does not validate against
REANA specification.
"""

def _prepare_kwargs(reana_yaml):
kwargs = {}
workflow_type = reana_yaml['workflow']['type']
if workflow_type == 'serial':
kwargs['specification'] = reana_yaml['workflow'].\
get('specification')
kwargs['parameters'] = \
reana_yaml.get('inputs', {}).get('parameters', {})
kwargs['original'] = True

if 'options' in reana_yaml['inputs']:
try:
reana_yaml['inputs']['options'] = validate_operational_options(
workflow_type,
reana_yaml['inputs']['options'])
except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
kwargs.update(reana_yaml['inputs']['options'])
return kwargs

try:
with open(filepath) as f:
reana_yaml = yaml.load(f.read(), Loader=yaml.FullLoader)
Expand All @@ -123,21 +146,14 @@ def load_reana_spec(filepath, skip_validation=False):
.format(filepath=filepath))
_validate_reana_yaml(reana_yaml)

kwargs = {}
if reana_yaml['workflow']['type'] == 'serial':
kwargs['specification'] = reana_yaml['workflow'].\
get('specification')
kwargs['parameters'] = \
reana_yaml.get('inputs', {}).get('parameters', {})
kwargs['original'] = True

workflow_type = reana_yaml['workflow']['type']
reana_yaml['workflow']['specification'] = load_workflow_spec(
reana_yaml['workflow']['type'],
workflow_type,
reana_yaml['workflow'].get('file'),
**kwargs
**_prepare_kwargs(reana_yaml)
)

if reana_yaml['workflow']['type'] == 'cwl' and \
if workflow_type == 'cwl' and \
'inputs' in reana_yaml:
with open(reana_yaml['inputs']['parameters']['input']) as f:
reana_yaml['inputs']['parameters'] = \
Expand Down Expand Up @@ -224,32 +240,27 @@ def get_workflow_name_and_run_number(workflow_name):
return workflow_name, ''


def validate_cwl_operational_options(operational_options):
"""Validate cwl operational options."""
forbidden_args = ['--debug', '--tmpdir-prefix', '--tmp-outdir-prefix'
'--default-container', '--outdir']
for option in operational_options:
if option in forbidden_args:
click.echo('Operational option {0} are not allowed. \n'
.format(operational_options), err=True)
sys.exit(1)
cmd = 'cwltool --version {0}'.format(' '.join(operational_options))
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as err:
click.echo('Operational options {0} are not valid. \n'
'{1}'.format(operational_options, err), err=True)
sys.exit(1)


def validate_serial_operational_options(operational_options):
"""Return validated serial operational options."""
try:
return dict(p.split('=') for p in operational_options)
except Exception as err:
click.echo('Operational options {0} are not valid. \n'
'{1}'.format(operational_options, err), err=True)
sys.exit(1)
def get_workflow_root():
"""Return the current workflow root directory."""
reana_yaml = get_reana_yaml_file_path()
workflow_root = os.getcwd()
while True:
file_list = os.listdir(workflow_root)
parent_dir = os.path.dirname(workflow_root)
if reana_yaml in file_list:
break
else:
if workflow_root == parent_dir:
click.echo(click.style(
'Not a workflow directory (or any of the parent'
' directories).\nPlease upload from inside'
' the directory containing the reana.yaml '
'file of your workflow.', fg='red'))
sys.exit(1)
else:
workflow_root = parent_dir
workflow_root += '/'
return workflow_root


def validate_input_parameters(live_parameters, original_parameters):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -50,7 +50,7 @@
'cwltool==1.0.20191022103248',
'pyOpenSSL>=19.0.0', # FIXME remove once yadage-schemas solves deps.
'jsonpointer>=2.0',
'reana-commons>=0.7.0.dev20200319,<0.8.0',
'reana-commons>=0.7.0.dev20200407,<0.8.0',
'rfc3987>=1.3.8', # FIXME remove once yadage-schemas solves deps.
'six==1.12.0',
'strict-rfc3339>=0.7', # FIXME remove once yadage-schemas solves deps.
Expand Down

0 comments on commit 9756df0

Please sign in to comment.