Skip to content

Commit

Permalink
Merge b7e4d5f into 58e15a4
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Feb 20, 2024
2 parents 58e15a4 + b7e4d5f commit a725d0d
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 110 deletions.
208 changes: 103 additions & 105 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def _get_node_info(workflow, node):
def _get_predecessors(workflow, node):
# recursive method to get predecessors of a given node
pred = []

for pnode in workflow.graph.predecessors(node):
pred = _get_predecessors(workflow, pnode)
cxns = {x[0]: x[2]
Expand Down Expand Up @@ -864,15 +865,17 @@ def _get_predecessors(workflow, node):
if wk_params['sample']:
df = ST(self.study_id).to_dataframe(samples=list(self))
for k, v in wk_params['sample'].items():
if k not in df.columns or v not in df[k].unique():
if k not in df.columns or (v != '*' and v not in
df[k].unique()):
reqs_satisfied = False
else:
total_conditions_satisfied += 1

if wk_params['prep']:
df = self.to_dataframe()
for k, v in wk_params['prep'].items():
if k not in df.columns or v not in df[k].unique():
if k not in df.columns or (v != '*' and v not in
df[k].unique()):
reqs_satisfied = False
else:
total_conditions_satisfied += 1
Expand All @@ -890,117 +893,112 @@ def _get_predecessors(workflow, node):

# let's just keep one, let's give it preference to the one with the
# most total_conditions_satisfied
workflows = sorted(workflows, key=lambda x: x[0], reverse=True)[:1]
_, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0]
missing_artifacts = dict()
for _, wk in workflows:
missing_artifacts[wk] = dict()
for node, degree in wk.graph.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
if mscheme not in merging_schemes:
missing_artifacts[wk][mscheme] = node
if not missing_artifacts[wk]:
del missing_artifacts[wk]
for node, degree in wk.graph.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
if mscheme not in merging_schemes:
missing_artifacts[mscheme] = node
if not missing_artifacts:
# raises option b.
raise ValueError('This preparation is complete')

# 3.
for wk, wk_data in missing_artifacts.items():
previous_jobs = dict()
for ma, node in wk_data.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
for i, (pnode, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
params = pdp.values.copy()
# verifying that the workflow.artifact_type is included
# in the command input types or raise an error
wkartifact_type = wk.artifact_type
reqp = dict()
for x, y in pdp_cmd.required_parameters.items():
if wkartifact_type not in y[1]:
raise ValueError(f'{wkartifact_type} is not part '
'of this preparation and cannot '
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)

if params in previous_jobs.values():
for x, y in previous_jobs.items():
if params == y:
current_job = x
previous_jobs = dict()
for ma, node in missing_artifacts.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
for i, (pnode, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
params = pdp.values.copy()
# verifying that the workflow.artifact_type is included
# in the command input types or raise an error
wkartifact_type = wk.artifact_type
reqp = dict()
for x, y in pdp_cmd.required_parameters.items():
if wkartifact_type not in y[1]:
raise ValueError(f'{wkartifact_type} is not part '
'of this preparation and cannot '
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)

if params in previous_jobs.values():
for x, y in previous_jobs.items():
if params == y:
current_job = x
else:
if workflow is None:
PW = qdb.processing_job.ProcessingWorkflow
workflow = PW.from_scratch(user, job_params)
current_job = [
j for j in workflow.graph.nodes()][0]
else:
if workflow is None:
PW = qdb.processing_job.ProcessingWorkflow
workflow = PW.from_scratch(user, job_params)
current_job = [
j for j in workflow.graph.nodes()][0]
if previous_job is None:
current_job = workflow.add(
job_params, req_params=req_params)
else:
if previous_job is None:
current_job = workflow.add(
job_params, req_params=req_params)
else:
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
previous_jobs[current_job] = params
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
previous_jobs[current_job] = params

return workflow

Expand Down
2 changes: 1 addition & 1 deletion qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ def submit(self, parent_job_id=None, dependent_jobs_list=None):
# names to know if it should be executed differently and the
# plugin should let Qiita know that a specific command should be ran
# as job array or not
cnames_to_skip = {'Calculate Cell Counts'}
cnames_to_skip = {'Calculate Cell Counts', 'Calculate RNA Copy Counts'}
if 'ENVIRONMENT' in plugin_env_script and cname not in cnames_to_skip:
# the job has to be in running state so the plugin can change its`
# status
Expand Down
9 changes: 5 additions & 4 deletions qiita_pet/handlers/study_handlers/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ def get(self):
res['creation_job_filename'] = fp['filename']
res['creation_job_filename_body'] = fp['body']
summary = None
if res['creation_job'].outputs:
summary = relpath(
if res['creation_job'].status == 'success':
if res['creation_job'].outputs:
# [0] is the id, [1] is the filepath
res['creation_job'].outputs['output'].html_summary_fp[1],
qiita_config.base_data_dir)
_file = res['creation_job'].outputs[
'output'].html_summary_fp[1]
summary = relpath(_file, qiita_config.base_data_dir)
res['creation_job_artifact_summary'] = summary

self.render('study_ajax/prep_summary.html', **res)
Expand Down

0 comments on commit a725d0d

Please sign in to comment.