In [1]:
from yaml import load, dump

In [2]:
import json

In [3]:
from yaml import CLoader as Loader, CDumper as Dumper

In [4]:
import pickle

In [5]:
NODES_LABEL = "nodes"
EDGES_LABEL = "edges"
SOURCE_LABEL = "source"
SOURCE_PORT_LABEL = "sourcePort"
TARGET_LABEL = "target"
TARGET_PORT_LABEL = "targetPort"

In [6]:
def resort_total_lst(total_lst: list, nodes_dict: dict) -> list:
    nodes_with_dep_lst = list(sorted([v[0] for v in total_lst]))
    nodes_without_dep_lst = [
        k for k in nodes_dict.keys() if k not in nodes_with_dep_lst
    ]
    ordered_lst, total_new_lst = [], []
    while len(total_new_lst) < len(total_lst):
        for ind, connect in total_lst:
            if ind not in ordered_lst:
                source_lst = [sd[SOURCE_LABEL] for sd in connect.values()]
                if all(
                    [s in ordered_lst or s in nodes_without_dep_lst for s in source_lst]
                ):
                    ordered_lst.append(ind)
                    total_new_lst.append([ind, connect])
    return total_new_lst

In [7]:
def group_edges(edges_lst: list) -> list:
    edges_sorted_lst = sorted(edges_lst, key=lambda x: x[TARGET_LABEL], reverse=True)
    total_lst, tmp_lst = [], []
    target_id = edges_sorted_lst[0][TARGET_LABEL]
    for ed in edges_sorted_lst:
        if target_id == ed[TARGET_LABEL]:
            tmp_lst.append(ed)
        else:
            total_lst.append((target_id, get_kwargs(lst=tmp_lst)))
            target_id = ed[TARGET_LABEL]
            tmp_lst = [ed]
    total_lst.append((target_id, get_kwargs(lst=tmp_lst)))
    return total_lst

In [8]:
def convert_nodes_list_to_dict(nodes_list: list) -> dict:
    return {
        str(el["id"]): el["value"] if "value" in el else el["name"]
        for el in sorted(nodes_list, key=lambda d: d["id"])
    }


In [9]:
def get_kwargs(lst: list) -> dict:
    return {
        t[TARGET_PORT_LABEL]: {
            SOURCE_LABEL: t[SOURCE_LABEL],
            SOURCE_PORT_LABEL: t[SOURCE_PORT_LABEL],
        }
        for t in lst
    }


In [10]:
def remove_result(workflow_dict):
    node_output_id = [
        n["id"] for n in workflow_dict[NODES_LABEL] if n["type"] == "output"
    ][0]
    return {
        NODES_LABEL: [n for n in workflow_dict[NODES_LABEL] if n["type"] != "output"],
        EDGES_LABEL: [
            e for e in workflow_dict[EDGES_LABEL] if e[TARGET_LABEL] != node_output_id
        ],
    }

In [11]:
def get_function_argument(argument, position=3):
    return {
        argument + '_file': {
            'type': 'File',
            'inputBinding': {'prefix': '--arg_' + argument + '=', 'separate': False, 'position': position},
        },
    }

In [12]:
def get_function_template(function_name):
    return {
        'function': {
            'default': function_name,
            'inputBinding': {'position': 2, 'prefix': '--function=', 'separate': False},
            'type': 'string',
        },
    }

In [13]:
def get_output_name(output_name):
    return {
        output_name + '_file': {
            'type': 'File',
            'outputBinding': {
                'glob': output_name + '.pickle'
            }
        }
    }

In [14]:
with open("workflow.json", "r") as f:
    workflow = json.load(f)

In [15]:
workflow

{'nodes': [{'id': 0, 'type': 'function', 'value': 'workflow.get_prod_and_div'},
  {'id': 1, 'type': 'function', 'value': 'workflow.get_sum'},
  {'id': 2, 'type': 'input', 'value': 1, 'name': 'x'},
  {'id': 3, 'type': 'input', 'value': 2, 'name': 'y'},
  {'id': 4, 'type': 'output', 'name': 'result'}],
 'edges': [{'target': 0, 'targetPort': 'x', 'source': 2, 'sourcePort': None},
  {'target': 0, 'targetPort': 'y', 'source': 3, 'sourcePort': None},
  {'target': 1, 'targetPort': 'x', 'source': 0, 'sourcePort': 'prod'},
  {'target': 1, 'targetPort': 'y', 'source': 0, 'sourcePort': 'div'},
  {'target': 4, 'targetPort': None, 'source': 1, 'sourcePort': None}]}

In [16]:
function_nodes_dict = {n['id']: n['value'] for n in workflow["nodes"] if n["type"] == "function"}
function_nodes_dict

{0: 'workflow.get_prod_and_div', 1: 'workflow.get_sum'}

In [17]:
funct_dict = {}
for funct_id in function_nodes_dict.keys():
    target_ports = list(set([e["targetPort"] for e in workflow["edges"] if e["target"] == funct_id]))
    source_ports = list(set([e["sourcePort"] for e in workflow["edges"] if e["source"] == funct_id]))
    funct_dict[funct_id] = {"targetPorts": target_ports, "sourcePorts": source_ports}

In [18]:
funct_dict

{0: {'targetPorts': ['y', 'x'], 'sourcePorts': ['prod', 'div']},
 1: {'targetPorts': ['y', 'x'], 'sourcePorts': [None]}}

In [19]:
for i in range(len(function_nodes_dict)):
    template = {
        'cwlVersion': 'v1.2',
        'class': 'CommandLineTool',
        'baseCommand': 'python',
        'inputs': {
            'wrapper': {
                'type': 'File',
                'inputBinding': {'position': 1},
                'default': {'class': 'File', 'location': 'wrapper.py'}
            },
        },
        'outputs': {
        }
    }
    file_name = function_nodes_dict[i].split(".")[-1] + ".cwl"
    template["inputs"].update(get_function_template(function_name=function_nodes_dict[i]))
    for j, arg in enumerate(funct_dict[i]['targetPorts']):
        template["inputs"].update(get_function_argument(argument=arg, position=3+j))
    for out in funct_dict[i]['sourcePorts']:
        if out is None:
            template["outputs"].update(get_output_name(output_name="result"))
        else:
            template["outputs"].update(get_output_name(output_name=out))
    with open(file_name, "w") as f:
        dump(template, f, Dumper=Dumper)

In [20]:
!cat get_sum.cwl

baseCommand: python
class: CommandLineTool
cwlVersion: v1.2
inputs:
  function:
    default: workflow.get_sum
    inputBinding:
      position: 2
      prefix: --function=
      separate: false
    type: string
  wrapper:
    default:
      class: File
      location: wrapper.py
    inputBinding:
      position: 1
    type: File
  x_file:
    inputBinding:
      position: 4
      prefix: --arg_x=
      separate: false
    type: File
  y_file:
    inputBinding:
      position: 3
      prefix: --arg_y=
      separate: false
    type: File
outputs:
  result_file:
    outputBinding:
      glob: result.pickle
    type: File


In [21]:
!cat get_prod_and_div.cwl

baseCommand: python
class: CommandLineTool
cwlVersion: v1.2
inputs:
  function:
    default: workflow.get_prod_and_div
    inputBinding:
      position: 2
      prefix: --function=
      separate: false
    type: string
  wrapper:
    default:
      class: File
      location: wrapper.py
    inputBinding:
      position: 1
    type: File
  x_file:
    inputBinding:
      position: 4
      prefix: --arg_x=
      separate: false
    type: File
  y_file:
    inputBinding:
      position: 3
      prefix: --arg_y=
      separate: false
    type: File
outputs:
  div_file:
    outputBinding:
      glob: div.pickle
    type: File
  prod_file:
    outputBinding:
      glob: prod.pickle
    type: File


In [22]:
input_dict = {n["name"]: n["value"] for n in workflow["nodes"] if n["type"] == "input"}
input_dict

{'x': 1, 'y': 2}

In [23]:
for k, v in input_dict.items():
    with open(k + ".pickle", "wb") as f:
        pickle.dump(v, f)

In [24]:
with open("workflow.yml", "w") as f:
    dump({k + "_file": {"class": "File", "path": k + ".pickle"} for k in input_dict.keys()}, f, Dumper=Dumper)

In [25]:
workflow_template = {
    'cwlVersion': 'v1.2',
    'class': 'Workflow',
    'inputs': {},
    'steps': {},
    'outputs': {},
}

In [26]:
workflow_template["inputs"].update({k + "_file": "File" for k in input_dict.keys()})

In [27]:
workflow

{'nodes': [{'id': 0, 'type': 'function', 'value': 'workflow.get_prod_and_div'},
  {'id': 1, 'type': 'function', 'value': 'workflow.get_sum'},
  {'id': 2, 'type': 'input', 'value': 1, 'name': 'x'},
  {'id': 3, 'type': 'input', 'value': 2, 'name': 'y'},
  {'id': 4, 'type': 'output', 'name': 'result'}],
 'edges': [{'target': 0, 'targetPort': 'x', 'source': 2, 'sourcePort': None},
  {'target': 0, 'targetPort': 'y', 'source': 3, 'sourcePort': None},
  {'target': 1, 'targetPort': 'x', 'source': 0, 'sourcePort': 'prod'},
  {'target': 1, 'targetPort': 'y', 'source': 0, 'sourcePort': 'div'},
  {'target': 4, 'targetPort': None, 'source': 1, 'sourcePort': None}]}

In [28]:
result_id = [n["id"] for n in workflow["nodes"] if n["type"] == "output"][0]

In [29]:
last_compute_id = [e["source"] for e in workflow["edges"] if e["target"] == result_id][0]

In [30]:
if funct_dict[last_compute_id]["sourcePorts"] == [None]:
    workflow_template["outputs"] = {"result_file": {"type": "File", "outputSource": function_nodes_dict[last_compute_id].split(".")[-1] + "/result_file"}}
else:
    raise ValueError()

In [31]:
workflow_template

{'cwlVersion': 'v1.2',
 'class': 'Workflow',
 'inputs': {'x_file': 'File', 'y_file': 'File'},
 'steps': {},
 'outputs': {'result_file': {'type': 'File',
   'outputSource': 'get_sum/result_file'}}}

In [32]:
content = remove_result(workflow_dict=workflow)
content

{'nodes': [{'id': 0, 'type': 'function', 'value': 'workflow.get_prod_and_div'},
  {'id': 1, 'type': 'function', 'value': 'workflow.get_sum'},
  {'id': 2, 'type': 'input', 'value': 1, 'name': 'x'},
  {'id': 3, 'type': 'input', 'value': 2, 'name': 'y'}],
 'edges': [{'target': 0, 'targetPort': 'x', 'source': 2, 'sourcePort': None},
  {'target': 0, 'targetPort': 'y', 'source': 3, 'sourcePort': None},
  {'target': 1, 'targetPort': 'x', 'source': 0, 'sourcePort': 'prod'},
  {'target': 1, 'targetPort': 'y', 'source': 0, 'sourcePort': 'div'}]}

In [33]:
edges_new_lst = content[EDGES_LABEL]
total_lst = group_edges(edges_new_lst)
nodes_new_dict = {int(k): v for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items()}

In [34]:
total_lst, nodes_new_dict, edges_new_lst

([(1,
   {'x': {'source': 0, 'sourcePort': 'prod'},
    'y': {'source': 0, 'sourcePort': 'div'}}),
  (0,
   {'x': {'source': 2, 'sourcePort': None},
    'y': {'source': 3, 'sourcePort': None}})],
 {0: 'workflow.get_prod_and_div', 1: 'workflow.get_sum', 2: 1, 3: 2},
 [{'target': 0, 'targetPort': 'x', 'source': 2, 'sourcePort': None},
  {'target': 0, 'targetPort': 'y', 'source': 3, 'sourcePort': None},
  {'target': 1, 'targetPort': 'x', 'source': 0, 'sourcePort': 'prod'},
  {'target': 1, 'targetPort': 'y', 'source': 0, 'sourcePort': 'div'}])

In [35]:
total_new_lst = resort_total_lst(total_lst=total_lst, nodes_dict=nodes_new_dict)
total_new_lst

[[0,
  {'x': {'source': 2, 'sourcePort': None},
   'y': {'source': 3, 'sourcePort': None}}],
 [1,
  {'x': {'source': 0, 'sourcePort': 'prod'},
   'y': {'source': 0, 'sourcePort': 'div'}}]]

In [36]:
step_name_lst = {t[0]: function_nodes_dict[t[0]].split(".")[-1] for t in total_new_lst}
step_name_lst

{0: 'get_prod_and_div', 1: 'get_sum'}

In [37]:
input_id_dict = {n["id"]: n["name"] for n in workflow["nodes"] if n["type"] == "input"}
input_id_dict

{2: 'x', 3: 'y'}

In [38]:
for t in total_new_lst:
    ind = t[0]
    node_script = step_name_lst[ind] + ".cwl"
    output = [o + "_file" if o is not None else "result_file" for o in funct_dict[ind]['sourcePorts']]
    in_dict = {}
    for k, v in t[1].items():
        if v["source"] in input_id_dict:
            in_dict[k + "_file"]= input_id_dict[v["source"]] + "_file"
        else:
            in_dict[k + "_file"] = step_name_lst[v["source"]] + "/" + v['sourcePort'] + "_file"
    workflow_template["steps"].update({step_name_lst[ind]: {"run": node_script, "in": in_dict, "out": output}})

In [39]:
workflow_template

{'cwlVersion': 'v1.2',
 'class': 'Workflow',
 'inputs': {'x_file': 'File', 'y_file': 'File'},
 'steps': {'get_prod_and_div': {'run': 'get_prod_and_div.cwl',
   'in': {'x_file': 'x_file', 'y_file': 'y_file'},
   'out': ['prod_file', 'div_file']},
  'get_sum': {'run': 'get_sum.cwl',
   'in': {'x_file': 'get_prod_and_div/prod_file',
    'y_file': 'get_prod_and_div/div_file'},
   'out': ['result_file']}},
 'outputs': {'result_file': {'type': 'File',
   'outputSource': 'get_sum/result_file'}}}

In [40]:
with open("workflow.cwl", "w") as f:
    dump(workflow_template, f, Dumper=Dumper)

In [41]:
!cat workflow.cwl

class: Workflow
cwlVersion: v1.2
inputs:
  x_file: File
  y_file: File
outputs:
  result_file:
    outputSource: get_sum/result_file
    type: File
steps:
  get_prod_and_div:
    in:
      x_file: x_file
      y_file: y_file
    out:
    - prod_file
    - div_file
    run: get_prod_and_div.cwl
  get_sum:
    in:
      x_file: get_prod_and_div/prod_file
      y_file: get_prod_and_div/div_file
    out:
    - result_file
    run: get_sum.cwl


In [42]:
! cwltool workflow.cwl workflow.yml

  sys.exit(run())
[1;30mINFO[0m /srv/conda/envs/notebook/bin/cwltool 3.1.20250110105449
[1;30mINFO[0m Resolved 'workflow.cwl' to 'file:///home/jovyan/workflow.cwl'
[1;30mINFO[0m [workflow ] start
[1;30mINFO[0m [workflow ] starting step get_prod_and_div
[1;30mINFO[0m [step get_prod_and_div] start
[1;30mINFO[0m [job get_prod_and_div] /tmp/p21hzmvp$ python \
    /tmp/ya8kx5o9/stge44f0af3-2853-4cf7-a996-4fcdded8c44e/wrapper.py \
    --function=workflow.get_prod_and_div \
    --arg_y=/tmp/ya8kx5o9/stg8dd211e4-95b8-4ff7-8e34-72497f77eb56/y.pickle \
    --arg_x=/tmp/ya8kx5o9/stg86270edb-ff44-41bd-8510-1ae80ad19bee/x.pickle
[1;30mINFO[0m [job get_prod_and_div] completed success
[1;30mINFO[0m [step get_prod_and_div] completed success
[1;30mINFO[0m [workflow ] starting step get_sum
[1;30mINFO[0m [step get_sum] start
[1;30mINFO[0m [job get_sum] /tmp/7hhnyskz$ python \
    /tmp/opjynpu_/stg6326a473-46ef-4188-b6f4-1ecfc34c1136/wrapper.py \
    --function=workflow.get_sum \
   

In [43]:
with open("result.pickle", "rb") as f:
    print(pickle.load(f))

2.5
