Skip to content

Commit

Permalink
simplify add_default_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Feb 20, 2024
1 parent f063ca0 commit b7e4d5f
Showing 1 changed file with 98 additions and 103 deletions.
201 changes: 98 additions & 103 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,117 +893,112 @@ def _get_predecessors(workflow, node):

# let's just keep one, let's give it preference to the one with the
# most total_conditions_satisfied
workflows = sorted(workflows, key=lambda x: x[0], reverse=True)[:1]
_, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0]
missing_artifacts = dict()
for _, wk in workflows:
missing_artifacts[wk] = dict()
for node, degree in wk.graph.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
if mscheme not in merging_schemes:
missing_artifacts[wk][mscheme] = node
if not missing_artifacts[wk]:
del missing_artifacts[wk]
for node, degree in wk.graph.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
if mscheme not in merging_schemes:
missing_artifacts[mscheme] = node
if not missing_artifacts:
# raises option b.
raise ValueError('This preparation is complete')

# 3.
for wk, wk_data in missing_artifacts.items():
previous_jobs = dict()
for ma, node in wk_data.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
for i, (pnode, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
params = pdp.values.copy()
# verifying that the workflow.artifact_type is included
# in the command input types or raise an error
wkartifact_type = wk.artifact_type
reqp = dict()
for x, y in pdp_cmd.required_parameters.items():
if wkartifact_type not in y[1]:
raise ValueError(f'{wkartifact_type} is not part '
'of this preparation and cannot '
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)

if params in previous_jobs.values():
for x, y in previous_jobs.items():
if params == y:
current_job = x
previous_jobs = dict()
for ma, node in missing_artifacts.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
for i, (pnode, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
params = pdp.values.copy()
# verifying that the workflow.artifact_type is included
# in the command input types or raise an error
wkartifact_type = wk.artifact_type
reqp = dict()
for x, y in pdp_cmd.required_parameters.items():
if wkartifact_type not in y[1]:
raise ValueError(f'{wkartifact_type} is not part '
'of this preparation and cannot '
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)

if params in previous_jobs.values():
for x, y in previous_jobs.items():
if params == y:
current_job = x
else:
if workflow is None:
PW = qdb.processing_job.ProcessingWorkflow
workflow = PW.from_scratch(user, job_params)
current_job = [
j for j in workflow.graph.nodes()][0]
else:
if workflow is None:
PW = qdb.processing_job.ProcessingWorkflow
workflow = PW.from_scratch(user, job_params)
current_job = [
j for j in workflow.graph.nodes()][0]
if previous_job is None:
current_job = workflow.add(
job_params, req_params=req_params)
else:
if previous_job is None:
current_job = workflow.add(
job_params, req_params=req_params)
else:
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
previous_jobs[current_job] = params
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
previous_jobs[current_job] = params

return workflow

Expand Down

0 comments on commit b7e4d5f

Please sign in to comment.