Skip to content

Commit

Permalink
Multiple inputs add default workflow (#3371)
Browse files Browse the repository at this point in the history
* add rna_copy_counts

* RNA -> Calculate RNA

* v != '*'

* v != '*' : fix conditional

* prep job only display if success

* allowing for multiple inputs in workflow

* fix error

* just one element

* rollback add_default_workflow

* add full workflow case

* do not add ancestors

* single command
  • Loading branch information
antgonza committed Feb 24, 2024
1 parent 57b84cf commit 426268c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 15 deletions.
66 changes: 51 additions & 15 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,25 @@ def _get_predecessors(workflow, node):
# recursive method to get predecessors of a given node
pred = []

for pnode in workflow.graph.predecessors(node):
parents = list(workflow.graph.predecessors(node))
for pnode in parents:
pred = _get_predecessors(workflow, pnode)
cxns = {x[0]: x[2]
for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections}
data = [pnode, node, cxns]
if pred is None:
pred = [data]
else:
pred.append(data)
pred = []

# making sure that if the node has extra parents they are
# generated first
parents.remove(pnode)
if parents:
for pnode in parents:
# [-1] just adding the parent and not its ancestors
pred.extend([_get_predecessors(workflow, pnode)[-1]])

pred.append(data)
return pred

# Note: we are going to use the final BIOMs to figure out which
Expand Down Expand Up @@ -894,8 +903,9 @@ def _get_predecessors(workflow, node):
# let's just keep one, let's give it preference to the one with the
# most total_conditions_satisfied
_, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0]
GH = wk.graph
missing_artifacts = dict()
for node, degree in wk.graph.out_degree():
for node, degree in GH.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
Expand All @@ -920,7 +930,7 @@ def _get_predecessors(workflow, node):
icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])
cmds_to_create.append([cdp, cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
Expand All @@ -942,7 +952,7 @@ def _get_predecessors(workflow, node):
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])
cmds_to_create.append([pdp, pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
Expand All @@ -953,14 +963,16 @@ def _get_predecessors(workflow, node):
cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
previous_dps = dict()
for i, (dp, cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job

req_params = dict()
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
Expand All @@ -970,12 +982,35 @@ def _get_predecessors(workflow, node):
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
if len(dp.command.required_parameters) > 1:
for pn in GH.predecessors(node):
info = _get_node_info(wk, pn)
n, cnx, _ = GH.get_edge_data(
pn, node)['connections'].connections[0]
if info not in merging_schemes or \
n not in merging_schemes[info]:
msg = ('This workflow contains a step with '
'multiple inputs so it cannot be '
'completed automatically, please add '
'the commands by hand.')
raise ValueError(msg)
req_params[cnx] = merging_schemes[info][n]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
if len(dp.command.required_parameters) == 1:
cxns = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
cxns[dname] = iname
connections = {previous_job: cxns}
else:
connections = dict()
for pn in GH.predecessors(node):
pndp = pn.default_parameter
n, cnx, _ = GH.get_edge_data(
pn, node)['connections'].connections[0]
_job = previous_dps[pndp.id]
req_params[cnx] = f'{_job.id}{n}'
connections[_job] = {n: cnx}
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)
Expand All @@ -997,8 +1032,9 @@ def _get_predecessors(workflow, node):
else:
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
connections=connections)
previous_jobs[current_job] = params
previous_dps[dp.id] = current_job

return workflow

Expand Down
40 changes: 40 additions & 0 deletions qiita_db/metadata_template/test/test_prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,46 @@ def test_artifact_setter(self):
"the parameters are the same as jobs"):
pt.add_default_workflow(qdb.user.User('test@foo.bar'))

# Then, let's clean up again and add a new command/step with 2
# BIOM input artifacts
for pj in wk.graph.nodes:
pj._set_error('Killed')
cmd = qdb.software.Command.create(
qdb.software.Software(1), "Multiple BIOM as inputs", "", {
'req_artifact_1': ['artifact:["BIOM"]', None],
'req_artifact_2': ['artifact:["BIOM"]', None],
}, outputs={'MB-output': 'BIOM'})
cmd_dp = qdb.software.DefaultParameters.create("", cmd)
# creating the new node for the cmd and linking it's two inputs with
# two inputs
sql = f"""
INSERT INTO qiita.default_workflow_node (
default_workflow_id, default_parameter_set_id)
VALUES (1, {cmd_dp.id});
INSERT INTO qiita.default_workflow_edge (
parent_id, child_id)
VALUES (8, 10);
INSERT INTO qiita.default_workflow_edge (
parent_id, child_id)
VALUES (9, 10);
INSERT INTO qiita.default_workflow_edge_connections (
default_workflow_edge_id, parent_output_id, child_input_id)
VALUES (6, 3, 99);
INSERT INTO qiita.default_workflow_edge_connections (
default_workflow_edge_id, parent_output_id, child_input_id)
VALUES (7, 3, 100)
"""
qdb.sql_connection.perform_as_transaction(sql)
wk = pt.add_default_workflow(qdb.user.User('test@foo.bar'))
self.assertEqual(len(wk.graph.nodes), 6)
self.assertEqual(len(wk.graph.edges), 5)
self.assertCountEqual(
[x.command.name for x in wk.graph.nodes],
# we should have 2 split libraries and 3 close reference
['Split libraries FASTQ', 'Split libraries FASTQ',
'Pick closed-reference OTUs', 'Pick closed-reference OTUs',
'Pick closed-reference OTUs', 'Multiple BIOM as inputs'])

# now let's test that an error is raised when there is no valid initial
# input data; this moves the data type from FASTQ to taxa_summary for
# the default_workflow_id = 1
Expand Down

0 comments on commit 426268c

Please sign in to comment.