Skip to content

Commit

Permalink
[Fixes #49092759] Added extensive testing for correctness of workflow…
Browse files Browse the repository at this point in the history
… inputs and input relationships.
  • Loading branch information
ngehlenborg committed May 2, 2013
1 parent 1f9ff13 commit 9417ab3
Showing 1 changed file with 209 additions and 31 deletions.
240 changes: 209 additions & 31 deletions refinery/workflow_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from celery.task import task
from core.models import Workflow, WorkflowDataInput, WorkflowEngine, \
WorkflowInputRelationships, TYPE_1_1
WorkflowInputRelationships, TYPE_1_1, TYPE_REPLICATE, NR_TYPES
from django.contrib.auth.models import Group
from django.contrib.sites.models import Site
from galaxy_connector.connection import Connection
Expand All @@ -28,9 +28,14 @@
GALAXY_WORKFLOW_STEPS = 'steps'

GALAXY_TOOL_OUTPUTS = 'outputs'
GALAXY_TOOL_INPUTS = 'inputs' # only set in input nodes (this is different from input_connections, i.e. incoming edges)
GALAXY_TOOL_ANNOTATION = 'annotation'
GALAXY_TOOL_ANNOTATION_REQUIRED_FIELDS = [('name',str)] # [(field, type), ...]
GALAXY_TOOL_ANNOTATION_OPTIONAL_FIELDS = [('type',str), ('description',str)]

GALAXY_INPUT_RELATIONSHIP_REQUIRED_FIELDS = [('category',str), ('set1',str), ('set2',str)] # [(field, type), ...]
GALAXY_INPUT_RELATIONSHIP_OPTIONAL_FIELDS = [] # [(field, type), ...]
GALAXY_INPUT_RELATIONSHIP_CATEGORIES = [ category[0] for category in NR_TYPES ]

@task()
def get_workflows( workflow_engine ):
Expand Down Expand Up @@ -68,6 +73,9 @@ def get_workflows( workflow_engine ):
def import_workflow( workflow, workflow_engine, workflow_dictionary ):

issues = []
has_step_issues = False
has_input_issues = False
has_annotation_issues = False

workflow_annotation = get_workflow_annotation( workflow_dictionary )

Expand All @@ -76,25 +84,33 @@ def import_workflow( workflow, workflow_engine, workflow_dictionary ):
return issues

workflow_type = get_workflow_type( workflow_annotation )
workflow_relationships = get_workflow_relationships( workflow_annotation )

if workflow_type is None:
issues.append( "Workflow type not found." )
return issues


# check workflow inputs for correct annotations
workflow_input_issues = check_workflow_inputs(workflow_dictionary)
if len( workflow_input_issues ) > 0:
has_input_issues = True
issues = issues + workflow_input_issues

# check workflow steps for correct annotations and skip import if problems are detected
workflow_step_issues = check_steps(workflow_dictionary)
if workflow_step_issues is None: # no error in parsing but no outputs defined
issues.append("Workflow does not declare outputs.")
has_step_issues = True
else:
if len( workflow_step_issues ) > 0:
has_step_issues = True
issues = issues + workflow_step_issues

# skip import if workflow has incorrect input annotations or step annotation
if has_step_issues or has_input_issues:
return issues

# import workflow
if workflow_type is not None: # if workflow is meant for refinery

# check workflow steps for correct annotations and skip import if problems are detected
step_issues = check_steps( workflow_dictionary )

if step_issues is None: # no error in parsing but no outputs defined
issues.append( "Workflow does not declare outputs." )
return issues

if len( step_issues ) > 0:
# store issues to return to calling function?
issues = issues + step_issues
return issues

workflow_object = Workflow.objects.create( name=workflow.name, internal_id=workflow.identifier, workflow_engine=workflow_engine, is_active=True, type=workflow_type, graph=json.dumps( workflow_dictionary ) )
workflow_object.set_manager_group( workflow_engine.get_manager_group() )
Expand Down Expand Up @@ -126,17 +142,18 @@ def import_workflow( workflow, workflow_engine, workflow_dictionary ):
# check to input NodeRelationshipType
# noderelationship types defined for workflows with greater than 1 input
# refinery_relationship=[{"category":"N-1", "set1":"input_file"}]
workflow_relationships = get_input_relationships( workflow_annotation )

if workflow_relationships is not None:
if (len(inputs) > 1):
for opt_r in workflow_relationships:
try:
temp_relationship = WorkflowInputRelationships(**opt_r)
temp_relationship.save()
workflow_object.input_relationships.add(temp_relationship)

workflow_object.input_relationships.add(temp_relationship)
except KeyError, e:
logger.error("refinery_relationship option error: %s" % e)
return
logger.error()
issues.append( "Input relationship option error: %s" % e )
return issues


Expand Down Expand Up @@ -270,6 +287,41 @@ def get_step_outputs( step_definition ):
return step_definition[GALAXY_TOOL_OUTPUTS]


def get_step_inputs( step_definition ):
"""
Return the inputs of the step or None.
"""

# no input field
if GALAXY_TOOL_INPUTS not in step_definition:
return None

# empty input field
if len( step_definition[GALAXY_TOOL_INPUTS] ) == 0:
return None;

return step_definition[GALAXY_TOOL_INPUTS]


def get_input_steps( workflow_dictionary ):
"""
Return the step definitions of the input steps of the workflow.
"""

steps = get_workflow_steps( workflow_dictionary )

if steps is None:
return None

input_steps = []

for step in steps:
if get_step_inputs( step ) is not None:
input_steps.append( step )

return input_steps


def check_step_annotation_syntax( annotation ):
"""
Check if step annotation contains all required fields and only required or optional fields (syntactic correctness).
Expand All @@ -293,7 +345,7 @@ def check_step_annotation_syntax( annotation ):
# test if all fields have the correct type
for field in GALAXY_TOOL_ANNOTATION_REQUIRED_FIELDS + GALAXY_TOOL_ANNOTATION_OPTIONAL_FIELDS:
if field[0] in output_file_settings.keys():
if not isinstance( output_file_settings[field[0]], field[1] ):
if not isinstance( output_file_settings[field[0]], field[1] ):
issues.append( 'In annotation of output file "' + str( output_file_name ) + '" field "' + field[0] + '" is of type "' + type( output_file_settings[field[0]] ).__name__ + '" but type "' + field[1].__name__ + '" is expected.' )

# test if there are undefined fields (i.e. fields that are neither required nor optional)
Expand Down Expand Up @@ -354,6 +406,9 @@ def check_step( step_definition ):
def check_steps( workflow_dictionary ):
steps = get_workflow_steps( workflow_dictionary )

if steps is None:
return None

correct_step_found = False
incorrect_step_found = False
issues = []
Expand All @@ -378,37 +433,160 @@ def check_steps( workflow_dictionary ):
return issues


def get_workflow_type( annotation ):
def get_workflow_type( workflow_annotation ):
"""
Determine the workflow type. If the workflow annotation does not contain the "refinery_type" tag, the workflow will be ignored.
:param annotation: Workflow annotation dictionary.
:type annotation: A Python dictionary.
:param workflow_annotation: Workflow annotation dictionary.
:type workflow_annotation: A Python dictionary.
"""

if GALAXY_WORKFLOW_TYPE not in annotation:
if GALAXY_WORKFLOW_TYPE not in workflow_annotation:
return None

# test if the "type" string in the annotation matches any of the defined types
for choice in Workflow.TYPE_CHOICES:
if choice[0] == annotation[GALAXY_WORKFLOW_TYPE]:
if choice[0] == workflow_annotation[GALAXY_WORKFLOW_TYPE]:
return choice[0]

logger.warning( '"' + annotation[GALAXY_WORKFLOW_TYPE] + '" is not a defined workflow type, the workflow will be ignored.' )
logger.warning( '"' + workflow_annotation[GALAXY_WORKFLOW_TYPE] + '" is not a defined workflow type, the workflow will be ignored.' )
return None


def get_workflow_relationships( annotation ):
def get_input_relationships( workflow_annotation ):
"""
Get the list of workflow input relationship. If the workflow annotation does not contain the "relationships" tag, None will be returned.
:param annotation: Workflow annotation dictionary.
:type annotation: A Python dictionary.
:param workflow_annotation: Workflow annotation dictionary.
:type workflow_annotation: A Python dictionary.
"""

if 'refinery_relationships' not in annotation:
if 'refinery_relationships' not in workflow_annotation:
return None

return annotation[GALAXY_WORKFLOW_RELATIONSHIPS]
return workflow_annotation[GALAXY_WORKFLOW_RELATIONSHIPS]


def check_workflow_inputs( workflow_dictionary ):

issues = []

# -------------------------------------------------------------------------
# TESTS FOR WORKFLOW INPUTS
# -------------------------------------------------------------------------

workflow_annotation = get_workflow_annotation( workflow_dictionary )

# TEST: if workflow has annotation
if workflow_annotation is None:
issues.append( 'Workflow annotation not found.' )
return issues

input_steps = get_input_steps( workflow_dictionary )

# TEST: if workflow defines inputs
if input_steps is None:
issues.append( 'Workflow does not define inputs.' )
return issues

# extract names of workflow input (if input step defines more than one input only the first one will be used)
workflow_input_names = []
for step in input_steps:
step_inputs = get_step_inputs( step )
if step_inputs is not None and 'name' in step_inputs[0]:
workflow_input_names.append( step_inputs[0]['name'] )

# TEST: are workflow input names unique?
unique_workflow_input_names = {}
map(unique_workflow_input_names.__setitem__, workflow_input_names, [])
if len( unique_workflow_input_names.keys() ) < len( workflow_input_names ):
issues.append( 'Workflow input names are not unique: ' + ', '.join( workflow_input_names ) )


# -------------------------------------------------------------------------
# TESTS FOR INPUT RELATIONSHIPS
# -------------------------------------------------------------------------

input_relationships = get_input_relationships( workflow_annotation )

# TEST: if workflow define more than one input relationship if there is only one input
if len( input_steps ) == 1:
if input_relationships is not None and len( input_relationships ) > 1:
issues.append( 'Workflow has only one input but more than one input relationship is defined.' )

# test input relationships
if input_relationships is not None:
input_relationship_issues = []
for input_relationship in input_relationships:
category = None
set1 = None
set2 = None

# TEST: test if only declared fields are being used as keys
found_key_set = set( input_relationship.keys() )
allowed_key_set = set( [ field[0] for field in GALAXY_INPUT_RELATIONSHIP_OPTIONAL_FIELDS ] + [ field[0] for field in GALAXY_INPUT_RELATIONSHIP_REQUIRED_FIELDS ] )
undefined_keys = list( found_key_set.symmetric_difference( allowed_key_set ) )

if len( undefined_keys ) > 0:
input_relationship_issues.append( 'Input relationship contains undefined keys: ' + ', '.join( undefined_keys ) + '.' )

# TEST: test if category is defined and referring to a declared input relationship category
if 'category' in input_relationship.keys():
category = input_relationship['category']
if category not in GALAXY_INPUT_RELATIONSHIP_CATEGORIES:
input_relationship_issues.append( 'Undefined category "' + category + '". Allowed categories are ' + ", ".join( GALAXY_INPUT_RELATIONSHIP_CATEGORIES ) + "." )
category = None
else:
input_relationship_issues.append( 'Input relationship does not define a category.' )

# TEST: test if input relationship is "replicate" if workflow has only one input
if len( input_steps ) == 1:
if category is not TYPE_REPLICATE:
input_relationship_issues.append( 'Workflow has only one input but input relationship is not of category "' + TYPE_REPLICATE + '".' )

# TEST: test if set1 is defined
if 'set1' in input_relationship.keys():
set1 = input_relationship['set1']
else:
input_relationship_issues.append( 'Input relationship does not define required field "set1".' )

# TEST: test if set2 is defined and if defined, test if it is required for the given category
if 'set2' in input_relationship.keys():
set2 = input_relationship['set2']

if category == TYPE_REPLICATE:
input_relationship_issues.append( 'Input relationship category is "' + TYPE_REPLICATE + '", but relationship does define field "set2".' )
else:
if category != TYPE_REPLICATE:
input_relationship_issues.append( 'Input relationship category is not "' + TYPE_REPLICATE + '" and relationship does not define required field "set2".' )

# TEST: test if set1 is referring to existing inputs of the workflow
if set1 is not None and set1 not in workflow_input_names:
input_relationship_issues.append( '"set1" refers to undefined input "' + set1 + '" but needs to refer to a defined input: ' + ', '.join( workflow_input_names ) )

# TEST: test if set2 is referring to existing inputs of the workflow
if set2 is not None and set2 not in workflow_input_names:
input_relationship_issues.append( '"set2" refers to undefined input "' + set2 + '" but needs to refer to a defined input: ' + ', '.join( workflow_input_names ) )

# TEST: test if set1 and set2 are referring to the same input
if set1 == set2:
input_relationship_issues.append( '"set1" and "set2" are both referring to input "' + set1 + '" but need to be referring to different inputs.' )

if len( input_relationship_issues ) > 0:
issues.append( 'Input relationship "' + str( input_relationship ) + "' is defined incorrectly:" )
issues = issues + input_relationship_issues

return issues













0 comments on commit 9417ab3

Please sign in to comment.