Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scottx611x/refactor attach outputs dataset #2591

Merged
merged 11 commits into from
Feb 15, 2018
2 changes: 1 addition & 1 deletion refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _attach_workflow_outputs(analysis_uuid):
analysis_status = _get_analysis_status(analysis_uuid)

if analysis.workflow.type == Workflow.ANALYSIS_TYPE:
analysis.attach_outputs_dataset()
analysis.attach_derived_nodes_to_dataset()
elif analysis.workflow.type == Workflow.DOWNLOAD_TYPE:
analysis.attach_outputs_downloads()
else:
Expand Down
308 changes: 172 additions & 136 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,11 @@ class Meta:
)
ordering = ['-time_end', '-time_start']

def get_workflow_graph(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_expanded_workflow_graph()?

return tool_manager.utils.create_expanded_workflow_graph(
ast.literal_eval(self.workflow_copy)
)

def delete(self, **kwargs):
"""
Overrides the Analysis model's delete method and checks if
Expand Down Expand Up @@ -1505,145 +1510,17 @@ def rename_results(self):
if node.is_derived():
node.run_generate_auxiliary_node_task()

def attach_outputs_dataset(self):
# for testing: attach workflow graph and output files to data set graph
# 0. get study and assay from the first input node
study = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
).first().node.study
assay = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
).first().node.assay
# 1. read workflow into graph
graph = tool_manager.utils.create_expanded_workflow_graph(
ast.literal_eval(self.workflow_copy)
def attach_derived_nodes_to_dataset(self):
graph_with_data_transformation_nodes = (
self._create_data_transformation_nodes(self.get_workflow_graph())
)
# 2. create data transformation nodes for all tool nodes
data_transformation_nodes = [
graph.node[node_id] for node_id in graph.nodes()
if graph.node[node_id]['type'] == "tool"
]
for data_transformation_node in data_transformation_nodes:
# TODO: incorporate subanalysis id in tool name???
node_name = "{}_{}".format(
data_transformation_node['tool_id'],
data_transformation_node['name']
)
data_transformation_node['node'] = (
Node.objects.create(
study=study,
assay=assay,
analysis_uuid=self.uuid,
type=Node.DATA_TRANSFORMATION,
name=node_name
)
)
# 3. create connection from input nodes to first data transformation
# nodes (input tool nodes in the graph are skipped)
input_node_connections = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
)
for input_connection in input_node_connections:
for edge in graph.edges_iter([input_connection.step]):
input_id = input_connection.get_input_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == input_id:
input_node_id = edge[1]
data_transformation_node = (
graph.node[input_node_id]['node']
)
input_connection.node.add_child(data_transformation_node)
# 4. create derived data file nodes for all entries and connect to data
# transformation nodes
output_connection_to_analysis_result_mapping = (
self._get_output_connection_to_analysis_result_mapping()
)
output_mappings = output_connection_to_analysis_result_mapping

for output_connection, analysis_result in output_mappings:
# create derived data file node
derived_data_file_node = self._create_derived_data_file_node(
study, assay, output_connection
graph_with_input_nodes_linked = (
self._link_input_nodes_to_data_transformation_nodes(
graph_with_data_transformation_nodes
)
if output_connection.is_refinery_file:
# retrieve uuid of corresponding output file if exists
logger.info(
"Results for '%s' and %s.%s: %s",
self.uuid,
output_connection,
output_connection.filetype,
analysis_result
)
derived_data_file_node.file_uuid = (
analysis_result.file_store_uuid
)
logger.debug(
"Output file %s ('%s') assigned to node %s ('%s')",
output_connection,
analysis_result.file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid
)
output_connection.node = derived_data_file_node
output_connection.save()

# get graph edge that corresponds to this output node:
# a. attach output node to source data transformation node
# b. attach output node to target data transformation node
# (if exists)
if len(graph.edges([output_connection.step])) > 0:
for edge in graph.edges_iter([output_connection.step]):
output_id = output_connection.get_output_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == output_id:
input_node_id = edge[0]
output_node_id = edge[1]

data_transformation_input_node = (
graph.node[input_node_id]['node']
)
data_transformation_output_node = (
graph.node[output_node_id]['node']
)
data_transformation_input_node.add_child(
derived_data_file_node
)
derived_data_file_node.add_child(
data_transformation_output_node
)
# TODO: here we could add a (Refinery internal)
# attribute to the derived data file node to
# indicate which output of the tool it corresponds to

# connect outputs that are not inputs for any data transformation
if (output_connection.is_refinery_file and
derived_data_file_node.parents.count() == 0):
graph.node[output_connection.step]['node'].add_child(
derived_data_file_node
)
# delete output nodes that are not refinery files and don't have
# any children
if (not output_connection.is_refinery_file and
derived_data_file_node.children.count() == 0):
output_connection.node.delete()

# 5. create annotated nodes and index new nodes
node_uuids = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=OUTPUT_CONNECTION,
is_refinery_file=True
).values_list('node__uuid', flat=True)

add_annotated_nodes_selection(
node_uuids,
Node.DERIVED_DATA_FILE,
study.uuid,
assay.uuid
)
self._prepare_annotated_nodes(node_uuids)
self._create_derived_data_file_nodes(graph_with_input_nodes_linked)
self._create_annotated_nodes()

def attach_outputs_downloads(self):
analysis_results = AnalysisResult.objects.filter(
Expand Down Expand Up @@ -1753,6 +1630,165 @@ def _create_derived_data_file_node(self, study,
workflow_output=analysis_node_connection.name
)

def _get_input_node(self):
return AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
).first().node

def get_input_node_study(self):
return self._get_input_node().study

def get_input_node_assay(self):
return self._get_input_node().assay

def _create_data_transformation_nodes(self, graph):
"""create data transformation nodes for all Tool nodes"""

data_transformation_nodes = [
graph.node[node_id] for node_id in graph.nodes()
if graph.node[node_id]['type'] == "tool"
]
for data_transformation_node in data_transformation_nodes:
# TODO: incorporate subanalysis id in tool name???
node_name = "{}_{}".format(
data_transformation_node['tool_id'],
data_transformation_node['name']
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe:

"{tool_id}_{name}".format(**data_transformation_node)

and then it's almost short enough to just put in the create below.

data_transformation_node['node'] = (
Node.objects.create(
study=self.get_input_node_study(),
assay=self.get_input_node_assay(),
analysis_uuid=self.uuid,
type=Node.DATA_TRANSFORMATION,
name=node_name
)
)
return graph

def _link_input_nodes_to_data_transformation_nodes(self, graph):
"""create connection from input nodes to first data transformation
nodes (input tool nodes in the graph are skipped)"""
input_node_connections = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
)
for input_connection in input_node_connections:
for edge in graph.edges_iter([input_connection.step]):
input_id = input_connection.get_input_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == input_id:
input_node_id = edge[1]
data_transformation_node = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to just back-slash in a case like this... maybe to avoid the closing paren, or the risk of a stray comma turning it into a tuple, but this could be the wrong call.

graph.node[input_node_id]['node']
)
input_connection.node.add_child(data_transformation_node)
return graph

def _create_derived_data_file_nodes(self, graph):
"""create derived data file nodes for all entries and connect to data
transformation nodes"""
output_connection_to_analysis_result_mapping = (
self._get_output_connection_to_analysis_result_mapping()
)
output_mappings = output_connection_to_analysis_result_mapping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use this name in the first place?


for output_connection, analysis_result in output_mappings:
derived_data_file_node = self._create_derived_data_file_node(
self.get_input_node_study(),
self.get_input_node_assay(),
output_connection
)
if output_connection.is_refinery_file:
# retrieve uuid of corresponding output file if exists
logger.info(
"Results for '%s' and %s.%s: %s",
self.uuid,
output_connection,
output_connection.filetype,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below, we debug with just output_connection... maybe that's enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the repr() of output_connection has the filetype information

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the repr?

analysis_result
)
derived_data_file_node.file_uuid = (
analysis_result.file_store_uuid
)
logger.debug(
"Output file %s ('%s') assigned to node %s ('%s')",
output_connection,
analysis_result.file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid
)
output_connection.node = derived_data_file_node
output_connection.save()

self._link_derived_data_file_node_to_data_transformation_node(
graph,
output_connection,
derived_data_file_node
)

def _link_derived_data_file_node_to_data_transformation_node(
self,
graph,
output_connection,
derived_data_file_node
):
"""get graph edge that corresponds to this output node:
a. attach output node to source data transformation node
b. attach output node to target data transformation node
(if exists)"""
if len(graph.edges([output_connection.step])) > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if it were zero, then the loop below would just exit quickly?

for edge in graph.edges_iter([output_connection.step]):
output_id = output_connection.get_output_connection_id()

if graph[edge[0]][edge[1]]['output_id'] == output_id:
input_node_id = edge[0]
output_node_id = edge[1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot like line 1681? maybe define the variables first?


data_transformation_input_node = (
graph.node[input_node_id]['node']
)
data_transformation_output_node = (
graph.node[output_node_id]['node']
)
data_transformation_input_node.add_child(
derived_data_file_node
)
derived_data_file_node.add_child(
data_transformation_output_node
)
# TODO: here we could add a (Refinery internal)
# attribute to the derived data file node to
# indicate which output of the tool it corresponds to

# connect outputs that are not inputs for any data transformation
if (output_connection.is_refinery_file and
derived_data_file_node.parents.count() == 0):
graph.node[output_connection.step]['node'].add_child(
derived_data_file_node
)
# delete output nodes that are not refinery files and don't have
# any children
if (not output_connection.is_refinery_file and
derived_data_file_node.children.count() == 0):
output_connection.node.delete()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe:

if derived_data_file_node.children.count() == 0:
  if output_connection.is_refinery_file:
    ...
  else:
    ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we could make the split here too, but looking at theses two ifs one is worried about derived_data_file_node.parents and the other derived_data_file_node.children


def _create_annotated_nodes(self):
"""create and index annotated nodes"""
node_uuids = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=OUTPUT_CONNECTION,
is_refinery_file=True
).values_list('node__uuid', flat=True)

add_annotated_nodes_selection(
node_uuids,
Node.DERIVED_DATA_FILE,
self.get_input_node_study().uuid,
self.get_input_node_assay().uuid
)
self._prepare_annotated_nodes(node_uuids)


#: Defining available relationship types
INPUT_CONNECTION = 'in'
Expand Down
24 changes: 12 additions & 12 deletions refinery/tool_manager/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2494,27 +2494,27 @@ def _get_galaxy_download_task_ids_wrapper(

return _get_galaxy_download_task_ids(self.tool.analysis)

def _attach_outputs_dataset_assertions(self):
def _attach_derived_nodes_to_dataset_assertions(self):
self._assert_analysis_node_connection_outputs_validity()
self.assertEqual(self.show_dataset_provenance_mock.call_count, 8)

def test_attach_outputs_dataset_dsc(self):
def test_attach_derived_nodes_to_dataset_dsc(self):
self._get_galaxy_download_task_ids_wrapper(
tool_is_data_set_collection_based=True
)
self.tool.analysis.attach_outputs_dataset()
self._attach_outputs_dataset_assertions()
self.tool.analysis.attach_derived_nodes_to_dataset()
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_outputs_dataset_non_dsc(self):
def test_attach_derived_nodes_to_dataset_non_dsc(self):
self._get_galaxy_download_task_ids_wrapper()
self.tool.analysis.attach_outputs_dataset()
self._attach_outputs_dataset_assertions()
self.tool.analysis.attach_derived_nodes_to_dataset()
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_outputs_dataset_same_name_workflow_results(self):
def test_attach_derived_nodes_to_dataset_same_name_workflow_results(self):
self._get_galaxy_download_task_ids_wrapper(
datasets_have_same_names=True
)
self.tool.analysis.attach_outputs_dataset()
self.tool.analysis.attach_derived_nodes_to_dataset()

output_connections = AnalysisNodeConnection.objects.filter(
analysis=self.tool.analysis,
Expand All @@ -2531,9 +2531,9 @@ def test_attach_outputs_dataset_same_name_workflow_results(self):
file_name=output_connection_filename
)
self.assertGreater(analysis_results.count(), 1)
self._attach_outputs_dataset_assertions()
self._attach_derived_nodes_to_dataset_assertions()

def test_attach_outputs_dataset_makes_proper_node_inheritance_chain(self):
def test_attach_derived_nodes_to_dataset_proper_node_inheritance(self):
self._get_galaxy_download_task_ids_wrapper()

exposed_output_connections = AnalysisNodeConnection.objects.filter(
Expand All @@ -2546,7 +2546,7 @@ def test_attach_outputs_dataset_makes_proper_node_inheritance_chain(self):
for output_connection in exposed_output_connections:
self.assertIsNone(output_connection.node)

self.tool.analysis.attach_outputs_dataset()
self.tool.analysis.attach_derived_nodes_to_dataset()

# Have to fetch again here since AnalysisNodeConnections have been
# updated
Expand Down