From 8fc57f459620cc2be4fbb67904c45a035d4b487c Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Thu, 8 Feb 2024 13:59:47 -0700 Subject: [PATCH 01/10] add rna_copy_counts --- qiita_db/processing_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qiita_db/processing_job.py b/qiita_db/processing_job.py index dcce029a6..5e152f0fe 100644 --- a/qiita_db/processing_job.py +++ b/qiita_db/processing_job.py @@ -1020,7 +1020,7 @@ def submit(self, parent_job_id=None, dependent_jobs_list=None): # names to know if it should be executed differently and the # plugin should let Qiita know that a specific command should be ran # as job array or not - cnames_to_skip = {'Calculate Cell Counts'} + cnames_to_skip = {'Calculate Cell Counts', 'RNA Copy Counts'} if 'ENVIRONMENT' in plugin_env_script and cname not in cnames_to_skip: # the job has to be in running state so the plugin can change its` # status From 0599c18cc2236ba478e109044f5e2fa4780b65b0 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Thu, 8 Feb 2024 15:35:35 -0700 Subject: [PATCH 02/10] RNA -> Calculate RNA --- qiita_db/processing_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qiita_db/processing_job.py b/qiita_db/processing_job.py index 5e152f0fe..a1f7e5baa 100644 --- a/qiita_db/processing_job.py +++ b/qiita_db/processing_job.py @@ -1020,7 +1020,7 @@ def submit(self, parent_job_id=None, dependent_jobs_list=None): # names to know if it should be executed differently and the # plugin should let Qiita know that a specific command should be ran # as job array or not - cnames_to_skip = {'Calculate Cell Counts', 'RNA Copy Counts'} + cnames_to_skip = {'Calculate Cell Counts', 'Calculate RNA Copy Counts'} if 'ENVIRONMENT' in plugin_env_script and cname not in cnames_to_skip: # the job has to be in running state so the plugin can change its` # status From 753a507e43a6602055a1e719bff0b523fd8f972d Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Fri, 16 Feb 2024 11:48:47 -0700 Subject: [PATCH 03/10] v != '*' --- qiita_db/metadata_template/prep_template.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index f39aaacb7..df10d9665 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -864,16 +864,18 @@ def _get_predecessors(workflow, node): if wk_params['sample']: df = ST(self.study_id).to_dataframe(samples=list(self)) for k, v in wk_params['sample'].items(): - if k not in df.columns or v not in df[k].unique(): - reqs_satisfied = False + if k not in df.columns: + if v != '*' and v not in df[k].unique(): + reqs_satisfied = False else: total_conditions_satisfied += 1 if wk_params['prep']: df = self.to_dataframe() for k, v in wk_params['prep'].items(): - if k not in df.columns or v not in df[k].unique(): - reqs_satisfied = False + if k not in df.columns: + if v != '*' and v not in df[k].unique(): + reqs_satisfied = False else: total_conditions_satisfied += 1 From b2ac1f7c97dac7457e7e41575944854356b3715d Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Fri, 16 Feb 2024 12:15:18 -0700 Subject: [PATCH 04/10] v != '*' : fix conditional --- qiita_db/metadata_template/prep_template.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index df10d9665..f69f162f1 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -864,18 +864,18 @@ def _get_predecessors(workflow, node): if wk_params['sample']: df = ST(self.study_id).to_dataframe(samples=list(self)) for k, v in wk_params['sample'].items(): - if k not in df.columns: - if v != '*' and v not in df[k].unique(): - reqs_satisfied = False + if k not in df.columns or (v != '*' and v not in + df[k].unique()): + reqs_satisfied = False else: total_conditions_satisfied += 1 if wk_params['prep']: df = self.to_dataframe() for k, v in wk_params['prep'].items(): - if k not in df.columns: - if v != '*' and v not in df[k].unique(): - reqs_satisfied = False + if k not in df.columns or (v != '*' and v not in + df[k].unique()): + reqs_satisfied = False else: total_conditions_satisfied += 1 From a2ef905bd2cb43fb8c73d04c8d2c213ea4872dd0 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Fri, 16 Feb 2024 13:58:42 -0700 Subject: [PATCH 05/10] prep job only display if success --- qiita_pet/handlers/study_handlers/prep_template.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/qiita_pet/handlers/study_handlers/prep_template.py b/qiita_pet/handlers/study_handlers/prep_template.py index 0af9949e3..167f981bd 100644 --- a/qiita_pet/handlers/study_handlers/prep_template.py +++ b/qiita_pet/handlers/study_handlers/prep_template.py @@ -81,11 +81,12 @@ def get(self): res['creation_job_filename'] = fp['filename'] res['creation_job_filename_body'] = fp['body'] summary = None - if res['creation_job'].outputs: - summary = relpath( + if res['creation_job'].status == 'success': + if res['creation_job'].outputs: # [0] is the id, [1] is the filepath - res['creation_job'].outputs['output'].html_summary_fp[1], - qiita_config.base_data_dir) + _file = res['creation_job'].outputs[ + 'output'].html_summary_fp[1] + summary = relpath(_file, qiita_config.base_data_dir) res['creation_job_artifact_summary'] = summary self.render('study_ajax/prep_summary.html', **res) From ff335467ed501b9d2c20d74e72f03acfe524271b Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Mon, 19 Feb 2024 12:09:09 -0700 Subject: [PATCH 06/10] allowing for multiple inputs in workflow --- qiita_db/metadata_template/prep_template.py | 60 ++++++++++++--------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index f69f162f1..19cf5dc5c 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -790,20 +790,22 @@ def _get_node_info(workflow, node): ccmd.name, ccmd.merging_scheme, parent_cmd_name, parent_merging_scheme, cparams, [], pparams) - def _get_predecessors(workflow, node): - # recursive method to get predecessors of a given node - pred = [] + def _get_previous_predecessors(workflow, node): + parents, cxns = [], [] for pnode in workflow.graph.predecessors(node): - pred = _get_predecessors(workflow, pnode) - cxns = {x[0]: x[2] - for x in workflow.graph.get_edge_data( - pnode, node)['connections'].connections} - data = [pnode, node, cxns] - if pred is None: - pred = [data] - else: - pred.append(data) - return pred + parents.append(pnode) + cxns.append( + {x[0]: x[1] for x in workflow.graph.get_edge_data( + pnode, node)['connections'].connections}) + return [parents, node, cxns] + + def _get_predecessors(workflow, node): + data = [_get_previous_predecessors(workflow, node)] + for pnode in data[0][0]: + d = _get_previous_predecessors(workflow, pnode) + if d[0]: + data.append(d) + return data # Note: we are going to use the final BIOMs to figure out which # processing is missing from the back/end to the front, as this @@ -913,24 +915,30 @@ def _get_predecessors(workflow, node): previous_jobs = dict() for ma, node in wk_data.items(): predecessors = _get_predecessors(wk, node) - predecessors.reverse() cmds_to_create = [] init_artifacts = None - for i, (pnode, cnode, cxns) in enumerate(predecessors): + for i, (pnodes, cnode, cxns) in enumerate(predecessors): cdp = cnode.default_parameter cdp_cmd = cdp.command params = cdp.values.copy() - icxns = {y: x for x, y in cxns.items()} - reqp = {x: icxns[y[1][0]] + # note that this assume that commands cannot have + # multiple inputs with the same name, which should + # be a safe assumption + for pnode, cxn in zip(pnodes, cxns): + info = _get_node_info(wk, pnode) + if init_artifacts is None: + init_artifacts = dict() + for k, v in merging_schemes[info].items(): + if k in cxn: + k = cxn[k] + init_artifacts[k] = v + + reqp = {x: y[1][0] for x, y in cdp_cmd.required_parameters.items()} + cmds_to_create.append([cdp_cmd, params, reqp]) - info = _get_node_info(wk, pnode) - if info in merging_schemes: - if set(merging_schemes[info]) >= set(cxns): - init_artifacts = merging_schemes[info] - break if init_artifacts is None: pdp = pnode.default_parameter pdp_cmd = pdp.command @@ -965,15 +973,15 @@ def _get_predecessors(workflow, node): previous_job = current_job if previous_job is None: req_params = dict() - for iname, dname in rp.items(): - if dname not in init_artifacts: - msg = (f'Missing Artifact type: "{dname}" in ' + for aname, aid in rp.items(): + if aname not in init_artifacts: + msg = (f'Missing Artifact type: "{aname}" in ' 'this preparation; this might be due ' 'to missing steps or not having the ' 'correct raw data.') # raises option c. raise ValueError(msg) - req_params[iname] = init_artifacts[dname] + req_params[aname] = init_artifacts[aname] else: req_params = dict() connections = dict() From b4f3201d8fd9a5a479d6a842806f77c20e43c0b7 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Mon, 19 Feb 2024 12:48:02 -0700 Subject: [PATCH 07/10] fix error --- qiita_db/metadata_template/prep_template.py | 23 ++++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index 19cf5dc5c..6ad9fb251 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -927,15 +927,16 @@ def _get_predecessors(workflow, node): # be a safe assumption for pnode, cxn in zip(pnodes, cxns): info = _get_node_info(wk, pnode) - if init_artifacts is None: - init_artifacts = dict() - for k, v in merging_schemes[info].items(): - if k in cxn: - k = cxn[k] - init_artifacts[k] = v + if info in merging_schemes: + if init_artifacts is None: + init_artifacts = dict() + for k, v in merging_schemes[info].items(): + if k in cxn: + k = cxn[k] + init_artifacts[k] = v - reqp = {x: y[1][0] - for x, y in cdp_cmd.required_parameters.items()} + rp = cdp_cmd.required_parameters + reqp = {x: y[1][0] for x, y in rp.items()} cmds_to_create.append([cdp_cmd, params, reqp]) @@ -979,8 +980,10 @@ def _get_predecessors(workflow, node): 'this preparation; this might be due ' 'to missing steps or not having the ' 'correct raw data.') - # raises option c. - raise ValueError(msg) + init_artifacts[aname] = init_artifacts[aid] + if aname not in init_artifacts: + # raises option c. + raise ValueError(msg) req_params[aname] = init_artifacts[aname] else: req_params = dict() From 2ef9604677f0650476307d62e1a46c0b3daddeac Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Mon, 19 Feb 2024 17:22:26 -0700 Subject: [PATCH 08/10] just one element --- qiita_db/metadata_template/prep_template.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index 6ad9fb251..9ad19cc34 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -917,7 +917,8 @@ def _get_predecessors(workflow, node): predecessors = _get_predecessors(wk, node) cmds_to_create = [] init_artifacts = None - for i, (pnodes, cnode, cxns) in enumerate(predecessors): + # we only need to "loop" over the first element + for pnodes, cnode, cxns in predecessors[:1]: cdp = cnode.default_parameter cdp_cmd = cdp.command params = cdp.values.copy() From f063ca033de7ee55e17e2ee648c674bb359af6ee Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Tue, 20 Feb 2024 07:15:14 -0700 Subject: [PATCH 09/10] rollback add_default_workflow --- qiita_db/metadata_template/prep_template.py | 71 +++++++++------------ 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index 9ad19cc34..a5ce283e9 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -790,22 +790,21 @@ def _get_node_info(workflow, node): ccmd.name, ccmd.merging_scheme, parent_cmd_name, parent_merging_scheme, cparams, [], pparams) - def _get_previous_predecessors(workflow, node): - parents, cxns = [], [] - for pnode in workflow.graph.predecessors(node): - parents.append(pnode) - cxns.append( - {x[0]: x[1] for x in workflow.graph.get_edge_data( - pnode, node)['connections'].connections}) - return [parents, node, cxns] - def _get_predecessors(workflow, node): - data = [_get_previous_predecessors(workflow, node)] - for pnode in data[0][0]: - d = _get_previous_predecessors(workflow, pnode) - if d[0]: - data.append(d) - return data + # recursive method to get predecessors of a given node + pred = [] + + for pnode in workflow.graph.predecessors(node): + pred = _get_predecessors(workflow, pnode) + cxns = {x[0]: x[2] + for x in workflow.graph.get_edge_data( + pnode, node)['connections'].connections} + data = [pnode, node, cxns] + if pred is None: + pred = [data] + else: + pred.append(data) + return pred # Note: we are going to use the final BIOMs to figure out which # processing is missing from the back/end to the front, as this @@ -915,32 +914,24 @@ def _get_predecessors(workflow, node): previous_jobs = dict() for ma, node in wk_data.items(): predecessors = _get_predecessors(wk, node) + predecessors.reverse() cmds_to_create = [] init_artifacts = None - # we only need to "loop" over the first element - for pnodes, cnode, cxns in predecessors[:1]: + for i, (pnode, cnode, cxns) in enumerate(predecessors): cdp = cnode.default_parameter cdp_cmd = cdp.command params = cdp.values.copy() - # note that this assume that commands cannot have - # multiple inputs with the same name, which should - # be a safe assumption - for pnode, cxn in zip(pnodes, cxns): - info = _get_node_info(wk, pnode) - if info in merging_schemes: - if init_artifacts is None: - init_artifacts = dict() - for k, v in merging_schemes[info].items(): - if k in cxn: - k = cxn[k] - init_artifacts[k] = v - - rp = cdp_cmd.required_parameters - reqp = {x: y[1][0] for x, y in rp.items()} - + icxns = {y: x for x, y in cxns.items()} + reqp = {x: icxns[y[1][0]] + for x, y in cdp_cmd.required_parameters.items()} cmds_to_create.append([cdp_cmd, params, reqp]) + info = _get_node_info(wk, pnode) + if info in merging_schemes: + if set(merging_schemes[info]) >= set(cxns): + init_artifacts = merging_schemes[info] + break if init_artifacts is None: pdp = pnode.default_parameter pdp_cmd = pdp.command @@ -975,17 +966,15 @@ def _get_predecessors(workflow, node): previous_job = current_job if previous_job is None: req_params = dict() - for aname, aid in rp.items(): - if aname not in init_artifacts: - msg = (f'Missing Artifact type: "{aname}" in ' + for iname, dname in rp.items(): + if dname not in init_artifacts: + msg = (f'Missing Artifact type: "{dname}" in ' 'this preparation; this might be due ' 'to missing steps or not having the ' 'correct raw data.') - init_artifacts[aname] = init_artifacts[aid] - if aname not in init_artifacts: - # raises option c. - raise ValueError(msg) - req_params[aname] = init_artifacts[aname] + # raises option c. + raise ValueError(msg) + req_params[iname] = init_artifacts[dname] else: req_params = dict() connections = dict() From b7e4d5f887f4335e18a08ba98d1e3b9984407557 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Tue, 20 Feb 2024 10:26:43 -0700 Subject: [PATCH 10/10] simplify add_default_workflow --- qiita_db/metadata_template/prep_template.py | 201 ++++++++++---------- 1 file changed, 98 insertions(+), 103 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index a5ce283e9..d05493d3f 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -893,117 +893,112 @@ def _get_predecessors(workflow, node): # let's just keep one, let's give it preference to the one with the # most total_conditions_satisfied - workflows = sorted(workflows, key=lambda x: x[0], reverse=True)[:1] + _, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0] missing_artifacts = dict() - for _, wk in workflows: - missing_artifacts[wk] = dict() - for node, degree in wk.graph.out_degree(): - if degree != 0: - continue - mscheme = _get_node_info(wk, node) - if mscheme not in merging_schemes: - missing_artifacts[wk][mscheme] = node - if not missing_artifacts[wk]: - del missing_artifacts[wk] + for node, degree in wk.graph.out_degree(): + if degree != 0: + continue + mscheme = _get_node_info(wk, node) + if mscheme not in merging_schemes: + missing_artifacts[mscheme] = node if not missing_artifacts: # raises option b. raise ValueError('This preparation is complete') # 3. - for wk, wk_data in missing_artifacts.items(): - previous_jobs = dict() - for ma, node in wk_data.items(): - predecessors = _get_predecessors(wk, node) - predecessors.reverse() - cmds_to_create = [] - init_artifacts = None - for i, (pnode, cnode, cxns) in enumerate(predecessors): - cdp = cnode.default_parameter - cdp_cmd = cdp.command - params = cdp.values.copy() - - icxns = {y: x for x, y in cxns.items()} - reqp = {x: icxns[y[1][0]] - for x, y in cdp_cmd.required_parameters.items()} - cmds_to_create.append([cdp_cmd, params, reqp]) - - info = _get_node_info(wk, pnode) - if info in merging_schemes: - if set(merging_schemes[info]) >= set(cxns): - init_artifacts = merging_schemes[info] - break - if init_artifacts is None: - pdp = pnode.default_parameter - pdp_cmd = pdp.command - params = pdp.values.copy() - # verifying that the workflow.artifact_type is included - # in the command input types or raise an error - wkartifact_type = wk.artifact_type - reqp = dict() - for x, y in pdp_cmd.required_parameters.items(): - if wkartifact_type not in y[1]: - raise ValueError(f'{wkartifact_type} is not part ' - 'of this preparation and cannot ' - 'be applied') - reqp[x] = wkartifact_type - - cmds_to_create.append([pdp_cmd, params, reqp]) - - if starting_job is not None: - init_artifacts = { - wkartifact_type: f'{starting_job.id}:'} - else: - init_artifacts = {wkartifact_type: self.artifact.id} - - cmds_to_create.reverse() - current_job = None - loop_starting_job = starting_job - for i, (cmd, params, rp) in enumerate(cmds_to_create): - if loop_starting_job is not None: - previous_job = loop_starting_job - loop_starting_job = None - else: - previous_job = current_job - if previous_job is None: - req_params = dict() - for iname, dname in rp.items(): - if dname not in init_artifacts: - msg = (f'Missing Artifact type: "{dname}" in ' - 'this preparation; this might be due ' - 'to missing steps or not having the ' - 'correct raw data.') - # raises option c. - raise ValueError(msg) - req_params[iname] = init_artifacts[dname] - else: - req_params = dict() - connections = dict() - for iname, dname in rp.items(): - req_params[iname] = f'{previous_job.id}{dname}' - connections[dname] = iname - params.update(req_params) - job_params = qdb.software.Parameters.load( - cmd, values_dict=params) - - if params in previous_jobs.values(): - for x, y in previous_jobs.items(): - if params == y: - current_job = x + previous_jobs = dict() + for ma, node in missing_artifacts.items(): + predecessors = _get_predecessors(wk, node) + predecessors.reverse() + cmds_to_create = [] + init_artifacts = None + for i, (pnode, cnode, cxns) in enumerate(predecessors): + cdp = cnode.default_parameter + cdp_cmd = cdp.command + params = cdp.values.copy() + + icxns = {y: x for x, y in cxns.items()} + reqp = {x: icxns[y[1][0]] + for x, y in cdp_cmd.required_parameters.items()} + cmds_to_create.append([cdp_cmd, params, reqp]) + + info = _get_node_info(wk, pnode) + if info in merging_schemes: + if set(merging_schemes[info]) >= set(cxns): + init_artifacts = merging_schemes[info] + break + if init_artifacts is None: + pdp = pnode.default_parameter + pdp_cmd = pdp.command + params = pdp.values.copy() + # verifying that the workflow.artifact_type is included + # in the command input types or raise an error + wkartifact_type = wk.artifact_type + reqp = dict() + for x, y in pdp_cmd.required_parameters.items(): + if wkartifact_type not in y[1]: + raise ValueError(f'{wkartifact_type} is not part ' + 'of this preparation and cannot ' + 'be applied') + reqp[x] = wkartifact_type + + cmds_to_create.append([pdp_cmd, params, reqp]) + + if starting_job is not None: + init_artifacts = { + wkartifact_type: f'{starting_job.id}:'} + else: + init_artifacts = {wkartifact_type: self.artifact.id} + + cmds_to_create.reverse() + current_job = None + loop_starting_job = starting_job + for i, (cmd, params, rp) in enumerate(cmds_to_create): + if loop_starting_job is not None: + previous_job = loop_starting_job + loop_starting_job = None + else: + previous_job = current_job + if previous_job is None: + req_params = dict() + for iname, dname in rp.items(): + if dname not in init_artifacts: + msg = (f'Missing Artifact type: "{dname}" in ' + 'this preparation; this might be due ' + 'to missing steps or not having the ' + 'correct raw data.') + # raises option c. + raise ValueError(msg) + req_params[iname] = init_artifacts[dname] + else: + req_params = dict() + connections = dict() + for iname, dname in rp.items(): + req_params[iname] = f'{previous_job.id}{dname}' + connections[dname] = iname + params.update(req_params) + job_params = qdb.software.Parameters.load( + cmd, values_dict=params) + + if params in previous_jobs.values(): + for x, y in previous_jobs.items(): + if params == y: + current_job = x + else: + if workflow is None: + PW = qdb.processing_job.ProcessingWorkflow + workflow = PW.from_scratch(user, job_params) + current_job = [ + j for j in workflow.graph.nodes()][0] else: - if workflow is None: - PW = qdb.processing_job.ProcessingWorkflow - workflow = PW.from_scratch(user, job_params) - current_job = [ - j for j in workflow.graph.nodes()][0] + if previous_job is None: + current_job = workflow.add( + job_params, req_params=req_params) else: - if previous_job is None: - current_job = workflow.add( - job_params, req_params=req_params) - else: - current_job = workflow.add( - job_params, req_params=req_params, - connections={previous_job: connections}) - previous_jobs[current_job] = params + current_job = workflow.add( + job_params, req_params=req_params, + connections={previous_job: connections}) + previous_jobs[current_job] = params return workflow