In [159]:
import yaml
def parse_wf(workflow,inputs):
    meta = {"@type":"wfdesc:Wokflow"}
    with open(workflow, 'r') as cwl_file:  
        wf_dict = yaml.safe_load(cwl_file)
    if isinstance(inputs,dict):
        input_dict = inputs
    else :
        with open(inputs, 'r') as input_file:  
            input_dict = yaml.safe_load(input_file)
    meta['wfdesc:hasInput'] = get_wf_inputs(wf_dict,input_dict)
    meta.update(parse_steps(wf_dict.get('steps')))
    meta['wfdesc:hasOutput'] = get_outputs(wf_dict)
    return(meta)

def parse_procss(process,inputs):
    meta = {"@type":"wfdesc:Process"}
    with open(workflow, 'r') as cwl_file:
        proc_dict = yaml.safe_load(cwl_file)
    with open(inputs, 'r') as input_file:  
        input_dict = yaml.safe_load(input_file)
    meta['wfdesc:hasInput'] = get_wf_inputs(proc_dict,input_dict)
    meta['commandRun'] = proc_dict.get('baseCommand')
    return(meta)

def parse_steps(steps):
    if steps == None:
        return({})
    steps_dict = {"wfdesc:hasProcess":[],"wfdesc:hasSubWorkflow":[]}
    for step in steps:
        try:
            with open(steps[step].get('run')) as f:
                cwl_process = yaml.safe_load(f)
        except:
            steps_dict["wfdesc:hasProcess"].append(step)
            continue
        if cwl_process['class'] == 'Workflow':
            inputs = gather_inputs(steps[step])
            steps_dict['wfdesc:hasSubWorkflow'].append(parse_cwl(cwl_process,inputs))
        else:
            inputs = gather_inputs(steps[step])
            process = {"@type":"wfdesc:Process","name":step}
            process['commandRun'] = cwl_process.get('baseCommand')
            process["wfdesc:hasInput"] = get_process_inputs(cwl_process,inputs)
            process["wfdesc:hasOutput"] = get_outputs(cwl_process)
            steps_dict["wfdesc:hasProcess"].append(process)
    return(steps_dict)
    
def get_wf_inputs(workflow,inputs):
    hasInputs = []
    try:
        for input_name, datatype in gather_inputs(workflow).items():#workflow.get('inputs').items():
            input_dict = {
                "@type":"wfdesc:Parameter",
                "name":input_name,               
            }
            if isinstance(datatype,dict):
                if datatype.get('class'):
                    input_dict['datatype'] = 'File'
                else:
                    if isinstance(datatype.get('type'),dict):
                        input_dict['datatype'] = datatype.get('type').get('type')
                    else: 
                        input_dict['datatype'] = datatype.get('type')
            if input_dict.get("datatype") == 'File':
                input_dict['file'] = inputs.get(input_name).get('path')
            if input_dict['datatype'] == 'array':
                input_dict['items'] = []
                item_struct = datatype.get('type').get('items').get('fields')
                for item in inputs.get(input_name):
                    elements = []
                    for element in item_struct:
                        element_dict = {"@type":"wfdesc:Parameter",
                                       "name":element,
                                       "datatype":item_struct[element].get('type')
                                       }
                        if element_dict.get("datatype") == 'File':
                            element_dict['file'] = item[element].get('path')
                        else:
                            element_dict['value'] = item[element]
                        elements.append(element_dict)
                    input_dict['items'].append(elements)
            else: 
                input_dict['value'] = inputs.get(input_name)
            hasInputs.append(input_dict)
    except:
        if workflow.get('inputs'):
            hasInputs.append("Error parsing cwl. Check all inputs match expected names.")
    return(hasInputs)

def get_process_inputs(workflow,inputs):
    hasInputs = []
    try:
        for input_name, datatype in gather_inputs(workflow).items():#workflow.get('inputs').items():
            input_dict = {
                "@type":"wfdesc:Parameter",
                "name":input_name,
                "datatype":datatype.get('type')                
            }
            input_dict['value'] = inputs.get(input_name)
            hasInputs.append(input_dict)
    except:
        if workflow.get('inputs'):
            hasInputs.append("Error parsing cwl. Check all inputs match expected names.")
    return(hasInputs)

def gather_inputs(step_dict):
    if step_dict.get('in'):
        return step_dict.get('in')
    elif step_dict.get('inputs'):
        return step_dict.get('inputs')
    return({})

def get_outputs(workflow):
    hasOutputs = []
    try:
        for output_name, datatype in gather_outputs(workflow).items():#workflow.get('inputs').items():
            output_dict = {
                "@type":"wfdesc:Parameter",
                "name":output_name,
                "datatype":datatype.get("type")
            }
            if datatype.get('outputSource'):
                output_dict['outputSource'] = datatype.get('outputSource')
            hasOutputs.append(output_dict)
    except:
        if workflow.get('outputs'):
            hasOutputs.append("Error parsing cwl. Check all inputs match expected names.")
    return(hasOutputs)

def gather_outputs(step_dict):
    if step_dict.get('out'):
        return step_dict.get('out')
    elif step_dict.get('outputs'):
        return step_dict.get('outputs')
    return({})

In [160]:
parse_wf('rna_seq_wf.cwl','rna_seq_job.yaml')

{'@type': 'wfdesc:Wokflow',
 'wfdesc:hasInput': [{'@type': 'wfdesc:Parameter',
   'name': 'input_bam',
   'datatype': 'array',
   'items': [[{'@type': 'wfdesc:Parameter',
      'name': 'bam',
      'datatype': 'File',
      'file': 'BAM/TYR-APPs1-16_Aligned.sortedByCoord.out.bam'},
     {'@type': 'wfdesc:Parameter',
      'name': 'output',
      'datatype': 'string',
      'value': 'TYR-APPs1-16.sam'}],
    [{'@type': 'wfdesc:Parameter',
      'name': 'bam',
      'datatype': 'File',
      'file': 'BAM/TYR-APPs1-25_Aligned.sortedByCoord.out.bam'},
     {'@type': 'wfdesc:Parameter',
      'name': 'output',
      'datatype': 'string',
      'value': 'TYR-APPs1-25.sam'}],
    [{'@type': 'wfdesc:Parameter',
      'name': 'bam',
      'datatype': 'File',
      'file': 'BAM/TYR-APPs1-26_Aligned.sortedByCoord.out.bam'},
     {'@type': 'wfdesc:Parameter',
      'name': 'output',
      'datatype': 'string',
      'value': 'TYR-APPs1-26.sam'}],
    [{'@type': 'wfdesc:Parameter',
      'name': 'b

In [155]:
parse_wf("1st-workflow.cwl","1st-workflow-job.yml")

{'@type': 'wfdesc:Wokflow',
 'wfdesc:hasInput': ['Error parsing cwl. Check all inputs match expected names.'],
 'wfdesc:hasProcess': [{'@type': 'wfdesc:Process',
   'name': 'untar',
   'commandRun': ['tar', '--extract'],
   'wfdesc:hasInput': [{'@type': 'wfdesc:Parameter',
     'name': 'tarfile',
     'datatype': 'File',
     'value': 'tarball'},
    {'@type': 'wfdesc:Parameter',
     'name': 'extractfile',
     'datatype': 'string',
     'value': 'name_of_file_to_extract'}],
   'wfdesc:hasOutput': [{'@type': 'wfdesc:Parameter',
     'name': 'extracted_file',
     'datatype': 'File'}]},
  'compile'],
 'wfdesc:hasSubWorkflow': [],
 'wfdesc:hasOutput': [{'@type': 'wfdesc:Parameter',
   'name': 'compiled_class',
   'datatype': 'File',
   'outputSource': 'compile/classfile'}]}

parse_wf("test_workflow.cwl","test.yaml")

<img src="image.jpg">