Skip to content

Commit

Permalink
Scottx611x/fix derived node attribute inheritance (#2174)
Browse files Browse the repository at this point in the history
* Code tidying

* Add `is_orphan()` to Node model

* Clearer parameter naming

* Remove unused code

* Fix attach_outputs_dataset to allow for derived data nodes to be properly connected to their parents

* Create AnalysisNodeConnection outputs for all Galaxy History datasets,  only create WorkflowFilesDL for asterisked Workflow outputs

* Update existing tests and mock data

* Add test for `is_orphan`

* Refactor `_get_analysis_group_number`, `_get_creating_job_output_name`, and `_get_workflow_step`

* Fix typo

* Update Galaxy Mock data

* Revert "Refactor `_get_analysis_group_number`, `_get_creating_job_output_name`, and `_get_workflow_step`"

This reverts commit 61cd954.

* Update mock data

* Update tests (test_attach_outputs_dataset_makes_proper_node_inheritance_chain still needs stronger assertions)

* Use `CREATING_JOB` constant

* Use factories that already exist

* Fix comment

* Add stronger assertions to `test_attach_outputs_dataset_makes_proper_node_inheritance_chain`

* Add methods to AnalysisNodeConnection to build these identifiers w/ tests

* `output_connection.step` was more clear as to what we were dealing with

* Did not need the sorting here

* analysis_group is a number already no need to be redundant

* Fix docstring

* Clearer naming inside `_get_galaxy_history_dataset_list`

* Clean up `_get_exposed_workflow_outputs`

* better naming in `_get_exposed_workflow_outputs`

* Add comment, remove unneccessary assertion

* Refactor `_get_workflow_step` to be more concise

* Remove loggers from debugging

* Update test now that I removed the `sorted()` from `_get_output_connection_to_analysis_result_mapping`
  • Loading branch information
scottx611x authored and mccalluc committed Sep 22, 2017
1 parent ebe3ee9 commit 6003a4a
Show file tree
Hide file tree
Showing 9 changed files with 538 additions and 182 deletions.
117 changes: 76 additions & 41 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,15 +1609,21 @@ def rename_results(self):
if item.get_filetype() == zipfile:
new_file_name = ''.join([root, '.zip'])
renamed_file_store_item_uuid = rename(
result.file_store_uuid, new_file_name)
result.file_store_uuid,
new_file_name
)

# Try to generate an auxiliary node for visualization purposes
# NOTE: We have to do this after renaming happens because before
# renaming, said FileStoreItems's datafile field does not point
# to an actual file

file_store_item_uuid = (
renamed_file_store_item_uuid if
renamed_file_store_item_uuid else result.file_store_uuid
)
try:
node = Node.objects.get(
file_uuid=renamed_file_store_item_uuid)
node = Node.objects.get(file_uuid=file_store_item_uuid)
except (Node.DoesNotExist, Node.MultipleObjectsReturned) as e:
logger.error("Error Fetching Node: %s", e)
else:
Expand Down Expand Up @@ -1653,27 +1659,35 @@ def attach_outputs_dataset(self):
# for testing: attach workflow graph and output files to data set graph
# 0. get study and assay from the first input node
study = AnalysisNodeConnection.objects.filter(
analysis=self, direction=INPUT_CONNECTION)[0].node.study
analysis=self,
direction=INPUT_CONNECTION
).first().node.study
assay = AnalysisNodeConnection.objects.filter(
analysis=self, direction=INPUT_CONNECTION)[0].node.assay
analysis=self,
direction=INPUT_CONNECTION
).first().node.assay
# 1. read workflow into graph
graph = create_expanded_workflow_graph(
ast.literal_eval(self.workflow_copy)
)
# 2. create data transformation nodes for all tool nodes
data_transformation_nodes = [graph.node[node_id]
for node_id in graph.nodes()
if graph.node[node_id]['type'] == "tool"]
data_transformation_nodes = [
graph.node[node_id] for node_id in graph.nodes()
if graph.node[node_id]['type'] == "tool"
]
for data_transformation_node in data_transformation_nodes:
# TODO: incorporate subanalysis id in tool name???
node_name = "{}_{}".format(
data_transformation_node['tool_id'],
data_transformation_node['name']
)
data_transformation_node['node'] = (
Node.objects.create(
study=study,
assay=assay,
analysis_uuid=self.uuid,
type=Node.DATA_TRANSFORMATION,
name=data_transformation_node['tool_id'] + '_' +
data_transformation_node['name']
name=node_name
)
)
# 3. create connection from input nodes to first data transformation
Expand All @@ -1684,12 +1698,13 @@ def attach_outputs_dataset(self):
)
for input_connection in input_node_connections:
for edge in graph.edges_iter([input_connection.step]):
if (graph[edge[0]][edge[1]]['output_id'] ==
str(input_connection.step) + '_' +
input_connection.filename):
input_id = input_connection.get_input_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == input_id:
input_node_id = edge[1]
data_transformation_node = \
data_transformation_node = (
graph.node[input_node_id]['node']
)
input_connection.node.add_child(data_transformation_node)
# 4. create derived data file nodes for all entries and connect to data
# transformation nodes
Expand All @@ -1711,53 +1726,62 @@ def attach_outputs_dataset(self):
workflow_output=output_connection.name
)
)
# retrieve uuid of corresponding output file if exists
logger.info("Results for '%s' and %s.%s: %s",
self.uuid, output_connection,
output_connection.filetype, analysis_result)

derived_data_file_node.file_uuid = analysis_result.file_store_uuid

logger.debug("Output file %s ('%s') assigned to node %s ('%s')",
output_connection, analysis_result.file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid)

if output_connection.is_refinery_file:
# retrieve uuid of corresponding output file if exists
logger.info(
"Results for '%s' and %s.%s: %s",
self.uuid,
output_connection,
output_connection.filetype,
analysis_result
)
derived_data_file_node.file_uuid = (
analysis_result.file_store_uuid
)
logger.debug(
"Output file %s ('%s') assigned to node %s ('%s')",
output_connection,
analysis_result.file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid
)
output_connection.node = derived_data_file_node
output_connection.save()

# get graph edge that corresponds to this output node:
# a. attach output node to source data transformation node
# b. attach output node to target data transformation node
# (if exists)
if len(graph.edges([output_connection.step])) > 0:
for edge in graph.edges_iter([output_connection.step]):
output_id = "{}_{}".format(
output_connection.step,
output_connection.filename
)
output_id = output_connection.get_output_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == output_id:
output_node_id = edge[0]
input_node_id = edge[1]
data_transformation_output_node = (
graph.node[output_node_id]['node']
)
input_node_id = edge[0]
output_node_id = edge[1]

data_transformation_input_node = (
graph.node[input_node_id]['node']
)
data_transformation_output_node.add_child(
data_transformation_output_node = (
graph.node[output_node_id]['node']
)
data_transformation_input_node.add_child(
derived_data_file_node
)
derived_data_file_node.add_child(
data_transformation_input_node
data_transformation_output_node
)
# TODO: here we could add a (Refinery internal)
# attribute to the derived data file node to
# indicate which output of the tool it corresponds to

# connect outputs that are not inputs for any data transformation
if (output_connection.is_refinery_file and
derived_data_file_node.parents.count() == 0):
graph.node[output_connection.step]['node'].add_child(
derived_data_file_node)
derived_data_file_node
)
# delete output nodes that are not refinery files and don't have
# any children
if (not output_connection.is_refinery_file and
Expand Down Expand Up @@ -1868,9 +1892,14 @@ def _get_output_connection_to_analysis_result_mapping(self):
analysis_uuid=self.uuid,
file_name=output_connection.filename
)[index]
output_connections_to_analysis_results.append(
(output_connection, analysis_result)
)
if output_connection.is_refinery_file:
output_connections_to_analysis_results.append(
(output_connection, analysis_result)
)
else:
output_connections_to_analysis_results.append(
(output_connection, None)
)
return output_connections_to_analysis_results

@property
Expand Down Expand Up @@ -1928,6 +1957,12 @@ def __unicode__(self):
self.name + " (" + str(self.is_refinery_file) + ")"
)

def get_input_connection_id(self):
return "{}_{}".format(self.step, self.filename)

def get_output_connection_id(self):
return "{}_{}".format(self.step, self.name)


class Download(TemporaryResource, OwnableResource):
data_set = models.ForeignKey(DataSet)
Expand Down
59 changes: 42 additions & 17 deletions refinery/core/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1448,16 +1448,28 @@ def setUp(self):
node=self.node,
step=1,
filename=self.node_filename,
direction=OUTPUT_CONNECTION
direction=OUTPUT_CONNECTION,
is_refinery_file=True
)
)
self.analysis_node_connection_b = (
AnalysisNodeConnection.objects.create(
analysis=self.analysis,
node=self.node,
step=1,
step=2,
filename=self.node_filename,
direction=OUTPUT_CONNECTION,
is_refinery_file=False
)
)
self.analysis_node_connection_c = (
AnalysisNodeConnection.objects.create(
analysis=self.analysis,
node=self.node,
step=3,
filename=self.node_filename,
direction=OUTPUT_CONNECTION
direction=OUTPUT_CONNECTION,
is_refinery_file=True
)
)
self.analysis_node_connection_with_node_analyzed_further = (
Expand Down Expand Up @@ -1561,29 +1573,42 @@ def test__prepare_annotated_nodes_calls_methods_in_proper_order(
)

def test___get_output_connection_to_analysis_result_mapping(self):
analysis_result_b = AnalysisResult.objects.create(
analysis_uuid=self.analysis.uuid,
file_store_uuid=self.node.file_uuid,
file_name=self.node_filename,
file_type=self.node.get_file_store_item().filetype
)
analysis_result_a = AnalysisResult.objects.create(
analysis_uuid=self.analysis.uuid,
file_store_uuid=self.node.file_uuid,
file_name=self.node_filename,
file_type=self.node.get_file_store_item().filetype
)
common_params = {
"analysis_uuid": self.analysis.uuid,
"file_store_uuid": self.node.file_uuid,
"file_name": self.node_filename,
"file_type": self.node.get_file_store_item().filetype
}
analysis_result_0 = AnalysisResult.objects.create(**common_params)
AnalysisResult.objects.create(**common_params)
analysis_result_1 = AnalysisResult.objects.create(**common_params)

output_mapping = (
self.analysis._get_output_connection_to_analysis_result_mapping()
)
self.assertEqual(
output_mapping,
[
(self.analysis_node_connection_b, analysis_result_b),
(self.analysis_node_connection_a, analysis_result_a)
(self.analysis_node_connection_c, analysis_result_0),
(self.analysis_node_connection_b, None),
(self.analysis_node_connection_a, analysis_result_1)
]
)

def test_analysis_node_connection_input_id(self):
self.assertEqual(
self.analysis_node_connection_a.get_input_connection_id(),
"{}_{}".format(self.analysis_node_connection_a.step,
self.analysis_node_connection_a.filename)
)

def test_analysis_node_connection_output_id(self):
self.assertEqual(
self.analysis_node_connection_a.get_output_connection_id(),
"{}_{}".format(self.analysis_node_connection_a.step,
self.analysis_node_connection_a.name)
)


class UtilitiesTest(TestCase):
def setUp(self):
Expand Down
3 changes: 3 additions & 0 deletions refinery/data_set_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ def is_derived(self):
"""
return self.type in self._get_derived_node_types()

def is_orphan(self):
return self.parents.count() == 0

def get_analysis_node_connections(self):
return core.models.AnalysisNodeConnection.objects.filter(node=self)

Expand Down
5 changes: 5 additions & 0 deletions refinery/data_set_manager/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,11 @@ def test_get_parents(self):
# Check inverse relationship:
self.assertEqual(self.another_node.uuid, self.node.get_children()[0])

def test_is_orphan(self):
self.assertTrue(self.another_node.is_orphan())
self.node.add_child(self.another_node)
self.assertFalse(self.another_node.is_orphan())

# Auxiliary nodes:

def test_create_and_associate_auxiliary_node(self):
Expand Down
1 change: 0 additions & 1 deletion refinery/data_set_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ def _add_annotated_nodes(

if len(bulk_list) > 0:
AnnotatedNode.objects.bulk_create(bulk_list)
bulk_list = []

end = time.time()

Expand Down
4 changes: 2 additions & 2 deletions refinery/galaxy_connector/galaxy_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,9 @@ def configure_workflow(workflow_dict, ret_list):
return new_workflow, history_download, analysis_node_connections


def create_expanded_workflow_graph(dictionary):
def create_expanded_workflow_graph(galaxy_workflow_dict):
graph = nx.MultiDiGraph()
steps = dictionary["steps"]
steps = galaxy_workflow_dict["steps"]
galaxy_input_types = tool_manager.models.WorkflowTool.GALAXY_INPUT_TYPES

# iterate over steps to create nodes
Expand Down

0 comments on commit 6003a4a

Please sign in to comment.