Skip to content

Commit

Permalink
Restructure Analysis Metadata Fetching for Celery (#3461)
Browse files Browse the repository at this point in the history
* cut down api requests by passing in result

* import generate_auxiliary_file to circumvent potential import issue

* remove reference to dsm.tasks

* store a tool's response information in a django field for repeated use

* re-format if statement

* allow request to be forced through instead of loading from database

* use ast.literal_eval for lists instead of json.loads

* further cache response to invocation call and refactor other cache

* reformat and delete unused imports

* revert data_set_manager import hack

* bump up the soft_time_limit

* initial commit for restructuring galaxy requests

* get dataset download list more directly from exposed_datasets

* retrieve analysis download list from the AnalysisNodeConnection list

* update download_list to use AnalysisNodeConnection outputs

* remove cache fields on Tools model and re-use full galaxy dataset list

* import OUTPUT_CONNECTION

* remove force_request parameter

* fix formatting on exposed_dataset_list paramter

* fix formatting

* add creating_job option to create output name to prevent repeat requests

* revert use of rename s3 as separate task

* remove RenameS3FileTask task

* revert fabricrc.sample file changes

* fix tool_manager mocking to match new behavior

* fix formatting on test

* resolve ilya's requests

* update field names

* update file extension in name

* remove check on invocation being None

* remove unneeded fields and fix tests

* reinsert "galaxy_dataset_name" to AnalysisNodeConnection

* remove unnecessary get_galaxy_dataset_download_list function and tests

* handle galaxy error

* update use of json.loads() and json.dumps() to be liter_eval with dict

* change back to ast.literal_eval from json.loads
  • Loading branch information
ilan-gold committed Oct 22, 2019
1 parent 1d30e04 commit 223158a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 88 deletions.
9 changes: 4 additions & 5 deletions refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,29 +470,28 @@ def _get_galaxy_download_task_ids(analysis):
task_id_list = []
# retrieving list of files to download for workflow
tool = _get_workflow_tool(analysis.uuid)
tool.create_analysis_output_node_connections()
galaxy_instance = analysis.workflow.workflow_engine.instance
try:
download_list = tool.get_galaxy_dataset_download_list()
download_list = tool.create_analysis_output_node_connections()
except galaxy.client.ConnectionError as exc:
error_msg = \
"Error downloading Galaxy history files for analysis '%s': %s"
logger.error(error_msg, analysis.name, exc.message)
analysis.set_status(Analysis.FAILURE_STATUS, error_msg)
analysis.galaxy_cleanup()
return task_id_list
galaxy_instance = analysis.workflow.workflow_engine.instance

# Iterating through files in current galaxy history
for results in download_list:
# download file if result state is "ok"
if results['state'] == 'ok':
file_extension = results["type"]
file_extension = results['file_ext']
result_name = "{}.{}".format(results['name'], file_extension)
# size of file defined by galaxy
file_size = results['file_size']
file_store_item = FileStoreItem(source=urlparse.urljoin(
galaxy_instance.base_url,
"datasets/{}/display?to_ext=txt".format(results['dataset_id'])
"datasets/{}/display?to_ext=txt".format(results['id'])
))
# workaround to set the correct file type for zip archives of
# FastQC HTML reports produced by Galaxy dynamically
Expand Down
19 changes: 19 additions & 0 deletions refinery/tool_manager/migrations/0030_auto_20191007_0941.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tool_manager', '0029_parameter_uuid'),
]

operations = [
migrations.AddField(
model_name='workflowtool',
name='invocation',
field=models.TextField(blank=True),
)
]
97 changes: 57 additions & 40 deletions refinery/tool_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ class WorkflowTool(Tool):
REVERSE = "reverse"
TOOL_ID = "tool_id"
WORKFLOW_OUTPUTS = "workflow_outputs"
invocation = models.TextField(blank=True)

class Meta:
verbose_name = "workflowtool"
Expand Down Expand Up @@ -793,8 +794,15 @@ def create_analysis_output_node_connections(self):
"""Create the AnalysisNodeConnection objects corresponding to the
output Nodes (Derived Data) of a WorkflowTool launch
"""
exposed_workflow_outputs = self._get_exposed_galaxy_datasets()
for galaxy_dataset in self._get_galaxy_history_dataset_list():
exposed_dataset_list = self._get_galaxy_history_dataset_list()
exposed_workflow_outputs = self._get_exposed_galaxy_datasets(
exposed_dataset_list=exposed_dataset_list
)
connection_dataset_list = []
connection_dataset_list_fields = [
'file_ext', 'name', 'state', 'file_size', 'id'
]
for galaxy_dataset in exposed_dataset_list:
AnalysisNodeConnection.objects.create(
analysis=self.analysis, direction=OUTPUT_CONNECTION,
name=self._get_creating_job_output_name(galaxy_dataset),
Expand All @@ -803,8 +811,15 @@ def create_analysis_output_node_connections(self):
filename=self._get_galaxy_dataset_filename(galaxy_dataset),
filetype=galaxy_dataset["file_ext"],
is_refinery_file=galaxy_dataset in exposed_workflow_outputs,
galaxy_dataset_name=galaxy_dataset["name"]
galaxy_dataset_name=galaxy_dataset['name']
)
connection_dataset_list += [
{
field: galaxy_dataset[field]
for field in connection_dataset_list_fields
}
]
return connection_dataset_list

def _create_collection_description(self):
"""
Expand Down Expand Up @@ -1030,29 +1045,6 @@ def _get_analysis_node_connection_input_filename(self):
else self.INPUT_DATASET
)

@handle_bioblend_exceptions
def get_galaxy_dataset_download_list(self):
"""
Return a list of dicts containing information about Galaxy Datasets
in our Workflow invocation's history if said Datasets correspond to a
user-defined `workflow_output`.
"""
exposed_galaxy_datasets = self._get_exposed_galaxy_datasets()
exposed_galaxy_dataset_ids = [
galaxy_dataset["id"] for galaxy_dataset in exposed_galaxy_datasets
]

history_file_list = self.galaxy_instance.get_history_file_list(
self.analysis.history_id
)
retained_download_list = [
galaxy_dataset for galaxy_dataset in history_file_list
if galaxy_dataset["dataset_id"] in exposed_galaxy_dataset_ids
]
assert len(retained_download_list) >= 1, \
"There should be at least one dataset to download from Galaxy."
return retained_download_list

@handle_bioblend_exceptions
def _get_galaxy_dataset_job(self, galaxy_dataset_dict):
return self.galaxy_connection.jobs.show_job(
Expand Down Expand Up @@ -1095,7 +1087,7 @@ def _get_galaxy_history_dataset_list(self):
]
return retained_datasets

def _get_exposed_galaxy_datasets(self):
def _get_exposed_galaxy_datasets(self, exposed_dataset_list=None):
"""
Retrieve all Galaxy Datasets that correspond to an asterisked
output in the Galaxy workflow editor.
Expand All @@ -1108,7 +1100,9 @@ def _get_exposed_galaxy_datasets(self):
explicitly exposed
"""
exposed_galaxy_datasets = []
for galaxy_dataset in self._get_galaxy_history_dataset_list():
if exposed_dataset_list is None:
exposed_dataset_list = self._get_galaxy_history_dataset_list()
for galaxy_dataset in exposed_dataset_list:
creating_job = self._get_galaxy_dataset_job(galaxy_dataset)

# `tool_id` corresponds to the descriptive name of a galaxy
Expand All @@ -1119,7 +1113,9 @@ def _get_exposed_galaxy_datasets(self):
)
workflow_steps_dict = self._get_workflow_dict()["steps"]
creating_job_output_name = (
self._get_creating_job_output_name(galaxy_dataset)
self._get_creating_job_output_name(
galaxy_dataset, creating_job
)
)
workflow_step_output_names = [
workflow_output["output_name"] for workflow_output in
Expand Down Expand Up @@ -1164,10 +1160,19 @@ def _get_galaxy_workflow_invocation(self):
"""
Fetch our Galaxy Workflow's invocation data.
"""
return self.galaxy_connection.workflows.show_invocation(
self.galaxy_workflow_history_id,
self.get_galaxy_dict()[self.GALAXY_WORKFLOW_INVOCATION_DATA]["id"]
)
# separate if-then assignment needed to avoid using the dict stored
# in self.invocation before .save() is called
if self.invocation == '':
invocation = self.galaxy_connection.workflows.show_invocation(
self.galaxy_workflow_history_id,
self.get_galaxy_dict()
[self.GALAXY_WORKFLOW_INVOCATION_DATA]["id"]
)
self.invocation = json.dumps(invocation)
self.save()
else:
invocation = json.loads(self.invocation)
return invocation

@handle_bioblend_exceptions
def _get_refinery_input_file_id(self, galaxy_dataset_dict):
Expand Down Expand Up @@ -1248,36 +1253,48 @@ def _get_tool_inputs_dict(self, workflow_step):

@handle_bioblend_exceptions
def _get_workflow_dict(self):
return self.galaxy_connection.workflows.export_workflow_dict(
self.get_workflow_internal_id()
)
# separate if-then assignment needed to avoid using the dict stored
# in workflow_copy before .save() is called
if self.analysis.workflow_copy == '' \
or self.analysis.workflow_copy is None:
workflow_copy = \
self.galaxy_connection.workflows.export_workflow_dict(
self.get_workflow_internal_id()
)
self.analysis.workflow_copy = workflow_copy
self.analysis.save()
else:
workflow_copy = ast.literal_eval(self.analysis.workflow_copy)
return workflow_copy

def get_workflow_internal_id(self):
return self.tool_definition.workflow.internal_id

def _get_workflow_step(self, galaxy_dataset_dict):
for step in self._get_galaxy_workflow_invocation()["steps"]:
if step["job_id"] == galaxy_dataset_dict[self.CREATING_JOB]:
return step["order_index"]
return step["order_index"]

# If we reach this point and have no workflow_steps, this means that
# the galaxy dataset in question corresponds to an `upload` or
# `input` step i.e. `0`
return self.INPUT_STEP_NUMBER

def _get_creating_job_output_name(self, galaxy_dataset_dict):
def _get_creating_job_output_name(self, galaxy_dataset_dict,
creating_job=None):
"""
Retrieve the specified output name from the creating Galaxy Job that
corresponds to a Galaxy Dataset
:param galaxy_dataset_dict: dict containing information about a
Galaxy Dataset.
:param creating_job: an optional argument to prevent repeat request
This is useful if there are any post-job-actions in place to do
renaming of said output dataset.
:return: The proper output name of our galaxy dataset
"""
creating_job = self._get_galaxy_dataset_job(galaxy_dataset_dict)
if creating_job is None:
creating_job = self._get_galaxy_dataset_job(galaxy_dataset_dict)
creating_job_outputs = creating_job["outputs"]
workflow_step_output_name = [
output_name for output_name in creating_job_outputs.keys()
Expand Down
77 changes: 34 additions & 43 deletions refinery/tool_manager/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ def test__get_galaxy_datasets_list(self):

def test__get_exposed_galaxy_datasets(self):
galaxy_datasets_list_mock = self.galaxy_datasets_list_mock.start()
self.show_job_mock.side_effect = self.show_job_side_effect
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b]
self.create_tool(ToolDefinition.WORKFLOW)
all_galaxy_datasets = self.tool._get_galaxy_history_dataset_list()
datasets_marked_as_output = self.tool._get_exposed_galaxy_datasets()
Expand All @@ -1158,6 +1158,11 @@ def test__get_workflow_step(self):
self.assertTrue(galaxy_datasets_list_mock.called)

def test__get_galaxy_download_tasks(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
task_id_list = self._get_galaxy_download_task_ids_wrapper()

self.assertEqual(AnalysisResult.objects.count(), 2)
Expand All @@ -1176,7 +1181,11 @@ def test_create_analysis_node_connections(self):
self.show_dataset_provenance_mock.side_effect = (
self.show_dataset_provenance_side_effect * 3
)
self.show_job_mock.side_effect = self.show_job_side_effect * 3
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
self.create_tool(ToolDefinition.WORKFLOW,
file_relationships=self.LIST_BASIC)
self.tool.create_analysis_output_node_connections()
Expand Down Expand Up @@ -1481,24 +1490,6 @@ def test_create_analysis_input_node_connections_non_dsc_input(self):
WorkflowTool.INPUT_DATASET)
self.assertFalse(analysis_node_connections[index].is_refinery_file)

def _create_analysis_node_connections_wrapper(self):
self.show_job_mock.side_effect = self.show_job_side_effect * 3
self.tool.create_analysis_output_node_connections()

def _get_galaxy_download_list_wrapper(self,
datasets_have_same_names=False):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_a,
galaxy_job_b, galaxy_job_b,
galaxy_job_a, galaxy_job_a]
if datasets_have_same_names:
self.show_dataset_mock.side_effect = (
galaxy_datasets_list_same_output_names
)
else:
self.show_dataset_mock.side_effect = galaxy_datasets_list

return self.tool.get_galaxy_dataset_download_list()

def _get_galaxy_download_task_ids_wrapper(
self,
datasets_have_same_names=False,
Expand Down Expand Up @@ -1526,19 +1517,12 @@ def _get_galaxy_download_task_ids_wrapper(
self.show_dataset_provenance_side_effect * 3
)

self._create_analysis_node_connections_wrapper()
download_list = self._get_galaxy_download_list_wrapper(
datasets_have_same_names=datasets_have_same_names
)
download_ids = self.tool.create_analysis_output_node_connections()

mock.patch(
"tool_manager.models.WorkflowTool."
"create_analysis_output_node_connections"
).start()
mock.patch(
"tool_manager.models.WorkflowTool"
".get_galaxy_dataset_download_list",
return_value=download_list
"create_analysis_output_node_connections",
return_value=download_ids
).start()

return _get_galaxy_download_task_ids(self.tool.analysis)
Expand All @@ -1548,18 +1532,33 @@ def _attach_derived_nodes_to_dataset_assertions(self):
self.assertEqual(self.show_dataset_provenance_mock.call_count, 8)

def test_attach_derived_nodes_to_dataset_dsc(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
self._get_galaxy_download_task_ids_wrapper(
tool_is_data_set_collection_based=True
)
self.tool.analysis.attach_derived_nodes_to_dataset()
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_derived_nodes_to_dataset_non_dsc(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
self._get_galaxy_download_task_ids_wrapper()
self.tool.analysis.attach_derived_nodes_to_dataset()
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_derived_nodes_to_dataset_same_name_workflow_results(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
self._get_galaxy_download_task_ids_wrapper(
datasets_have_same_names=True
)
Expand All @@ -1581,6 +1580,11 @@ def test_attach_derived_nodes_to_dataset_same_name_workflow_results(self):
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_derived_nodes_to_dataset_proper_node_inheritance(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b,
galaxy_job_a, galaxy_job_a,
galaxy_job_a, galaxy_job_b,
galaxy_job_b, galaxy_job_a,
galaxy_job_b]
self._get_galaxy_download_task_ids_wrapper()

exposed_output_connections = AnalysisNodeConnection.objects.filter(
Expand Down Expand Up @@ -1644,19 +1648,6 @@ def test_create_galaxy_library_sets_analysis_library_id(self):
self.tool.create_galaxy_library()
self.assertEqual(self.tool.analysis.library_id, library_dict["id"])

def test_get_galaxy_dataset_download_list(self):
self.galaxy_datasets_list_mock.start()
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_a,
galaxy_job_b, galaxy_job_b,
galaxy_job_a, galaxy_job_a]
self.show_dataset_mock.side_effect = galaxy_datasets_list

self.create_tool(ToolDefinition.WORKFLOW)
self.assertEqual(
len(self.tool.get_galaxy_dataset_download_list()),
2
)

def test__get_creating_job_output_name(self):
self.show_job_mock.side_effect = [galaxy_job_a, galaxy_job_b]
self.create_tool(ToolDefinition.WORKFLOW)
Expand Down

0 comments on commit 223158a

Please sign in to comment.