-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scottx611x/refactor attach outputs dataset #2591
Changes from 3 commits
fbe3974
2a2be1a
dc53581
33b7091
5159f67
f4bd7c8
9f43aaf
b1fe518
c302cfb
90e55fd
91ecd5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1161,6 +1161,12 @@ class Meta: | |
) | ||
ordering = ['-time_end', '-time_start'] | ||
|
||
@property | ||
def workflow_graph(self): | ||
return tool_manager.utils.create_expanded_workflow_graph( | ||
ast.literal_eval(self.workflow_copy) | ||
) | ||
|
||
def delete(self, **kwargs): | ||
""" | ||
Overrides the Analysis model's delete method and checks if | ||
|
@@ -1505,145 +1511,17 @@ def rename_results(self): | |
if node.is_derived(): | ||
node.run_generate_auxiliary_node_task() | ||
|
||
def attach_outputs_dataset(self): | ||
# for testing: attach workflow graph and output files to data set graph | ||
# 0. get study and assay from the first input node | ||
study = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=INPUT_CONNECTION | ||
).first().node.study | ||
assay = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=INPUT_CONNECTION | ||
).first().node.assay | ||
# 1. read workflow into graph | ||
graph = tool_manager.utils.create_expanded_workflow_graph( | ||
ast.literal_eval(self.workflow_copy) | ||
def attach_derived_nodes_to_dataset(self): | ||
graph_with_data_transformation_nodes = ( | ||
self._create_data_transformation_nodes(self.workflow_graph) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We just talked about this... probably more clear just to call? |
||
) | ||
# 2. create data transformation nodes for all tool nodes | ||
data_transformation_nodes = [ | ||
graph.node[node_id] for node_id in graph.nodes() | ||
if graph.node[node_id]['type'] == "tool" | ||
] | ||
for data_transformation_node in data_transformation_nodes: | ||
# TODO: incorporate subanalysis id in tool name??? | ||
node_name = "{}_{}".format( | ||
data_transformation_node['tool_id'], | ||
data_transformation_node['name'] | ||
) | ||
data_transformation_node['node'] = ( | ||
Node.objects.create( | ||
study=study, | ||
assay=assay, | ||
analysis_uuid=self.uuid, | ||
type=Node.DATA_TRANSFORMATION, | ||
name=node_name | ||
) | ||
) | ||
# 3. create connection from input nodes to first data transformation | ||
# nodes (input tool nodes in the graph are skipped) | ||
input_node_connections = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=INPUT_CONNECTION | ||
) | ||
for input_connection in input_node_connections: | ||
for edge in graph.edges_iter([input_connection.step]): | ||
input_id = input_connection.get_input_connection_id() | ||
|
||
if graph[edge[0]][edge[1]]['output_id'] == input_id: | ||
input_node_id = edge[1] | ||
data_transformation_node = ( | ||
graph.node[input_node_id]['node'] | ||
) | ||
input_connection.node.add_child(data_transformation_node) | ||
# 4. create derived data file nodes for all entries and connect to data | ||
# transformation nodes | ||
output_connection_to_analysis_result_mapping = ( | ||
self._get_output_connection_to_analysis_result_mapping() | ||
) | ||
output_mappings = output_connection_to_analysis_result_mapping | ||
|
||
for output_connection, analysis_result in output_mappings: | ||
# create derived data file node | ||
derived_data_file_node = self._create_derived_data_file_node( | ||
study, assay, output_connection | ||
graph_with_input_nodes_linked = ( | ||
self._link_input_nodes_to_data_transformation_nodes( | ||
graph_with_data_transformation_nodes | ||
) | ||
if output_connection.is_refinery_file: | ||
# retrieve uuid of corresponding output file if exists | ||
logger.info( | ||
"Results for '%s' and %s.%s: %s", | ||
self.uuid, | ||
output_connection, | ||
output_connection.filetype, | ||
analysis_result | ||
) | ||
derived_data_file_node.file_uuid = ( | ||
analysis_result.file_store_uuid | ||
) | ||
logger.debug( | ||
"Output file %s ('%s') assigned to node %s ('%s')", | ||
output_connection, | ||
analysis_result.file_store_uuid, | ||
derived_data_file_node.name, | ||
derived_data_file_node.uuid | ||
) | ||
output_connection.node = derived_data_file_node | ||
output_connection.save() | ||
|
||
# get graph edge that corresponds to this output node: | ||
# a. attach output node to source data transformation node | ||
# b. attach output node to target data transformation node | ||
# (if exists) | ||
if len(graph.edges([output_connection.step])) > 0: | ||
for edge in graph.edges_iter([output_connection.step]): | ||
output_id = output_connection.get_output_connection_id() | ||
|
||
if graph[edge[0]][edge[1]]['output_id'] == output_id: | ||
input_node_id = edge[0] | ||
output_node_id = edge[1] | ||
|
||
data_transformation_input_node = ( | ||
graph.node[input_node_id]['node'] | ||
) | ||
data_transformation_output_node = ( | ||
graph.node[output_node_id]['node'] | ||
) | ||
data_transformation_input_node.add_child( | ||
derived_data_file_node | ||
) | ||
derived_data_file_node.add_child( | ||
data_transformation_output_node | ||
) | ||
# TODO: here we could add a (Refinery internal) | ||
# attribute to the derived data file node to | ||
# indicate which output of the tool it corresponds to | ||
|
||
# connect outputs that are not inputs for any data transformation | ||
if (output_connection.is_refinery_file and | ||
derived_data_file_node.parents.count() == 0): | ||
graph.node[output_connection.step]['node'].add_child( | ||
derived_data_file_node | ||
) | ||
# delete output nodes that are not refinery files and don't have | ||
# any children | ||
if (not output_connection.is_refinery_file and | ||
derived_data_file_node.children.count() == 0): | ||
output_connection.node.delete() | ||
|
||
# 5. create annotated nodes and index new nodes | ||
node_uuids = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=OUTPUT_CONNECTION, | ||
is_refinery_file=True | ||
).values_list('node__uuid', flat=True) | ||
|
||
add_annotated_nodes_selection( | ||
node_uuids, | ||
Node.DERIVED_DATA_FILE, | ||
study.uuid, | ||
assay.uuid | ||
) | ||
self._prepare_annotated_nodes(node_uuids) | ||
self._create_derived_data_file_nodes(graph_with_input_nodes_linked) | ||
self._create_annotated_nodes() | ||
|
||
def attach_outputs_downloads(self): | ||
analysis_results = AnalysisResult.objects.filter( | ||
|
@@ -1753,6 +1631,165 @@ def _create_derived_data_file_node(self, study, | |
workflow_output=analysis_node_connection.name | ||
) | ||
|
||
def _get_input_node(self): | ||
return AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=INPUT_CONNECTION | ||
).first().node | ||
|
||
def get_input_node_study(self): | ||
return self._get_input_node().study | ||
|
||
def get_input_node_assay(self): | ||
return self._get_input_node().assay | ||
|
||
def _create_data_transformation_nodes(self, graph): | ||
"""create data transformation nodes for all Tool nodes""" | ||
|
||
data_transformation_nodes = [ | ||
graph.node[node_id] for node_id in graph.nodes() | ||
if graph.node[node_id]['type'] == "tool" | ||
] | ||
for data_transformation_node in data_transformation_nodes: | ||
# TODO: incorporate subanalysis id in tool name??? | ||
node_name = "{}_{}".format( | ||
data_transformation_node['tool_id'], | ||
data_transformation_node['name'] | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe: "{tool_id}_{name}".format(**data_transformation_node) and then it's almost short enough to just put in the |
||
data_transformation_node['node'] = ( | ||
Node.objects.create( | ||
study=self.get_input_node_study(), | ||
assay=self.get_input_node_assay(), | ||
analysis_uuid=self.uuid, | ||
type=Node.DATA_TRANSFORMATION, | ||
name=node_name | ||
) | ||
) | ||
return graph | ||
|
||
def _link_input_nodes_to_data_transformation_nodes(self, graph): | ||
"""create connection from input nodes to first data transformation | ||
nodes (input tool nodes in the graph are skipped)""" | ||
input_node_connections = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=INPUT_CONNECTION | ||
) | ||
for input_connection in input_node_connections: | ||
for edge in graph.edges_iter([input_connection.step]): | ||
input_id = input_connection.get_input_connection_id() | ||
|
||
if graph[edge[0]][edge[1]]['output_id'] == input_id: | ||
input_node_id = edge[1] | ||
data_transformation_node = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would tend to just back-slash in a case like this... maybe to avoid the closing paren, or the risk of a stray comma turning it into a tuple, but this could be the wrong call. |
||
graph.node[input_node_id]['node'] | ||
) | ||
input_connection.node.add_child(data_transformation_node) | ||
return graph | ||
|
||
def _create_derived_data_file_nodes(self, graph): | ||
"""create derived data file nodes for all entries and connect to data | ||
transformation nodes""" | ||
output_connection_to_analysis_result_mapping = ( | ||
self._get_output_connection_to_analysis_result_mapping() | ||
) | ||
output_mappings = output_connection_to_analysis_result_mapping | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just use this name in the first place? |
||
|
||
for output_connection, analysis_result in output_mappings: | ||
derived_data_file_node = self._create_derived_data_file_node( | ||
self.get_input_node_study(), | ||
self.get_input_node_assay(), | ||
output_connection | ||
) | ||
if output_connection.is_refinery_file: | ||
# retrieve uuid of corresponding output file if exists | ||
logger.info( | ||
"Results for '%s' and %s.%s: %s", | ||
self.uuid, | ||
output_connection, | ||
output_connection.filetype, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Below, we debug with just output_connection... maybe that's enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the repr() of output_connection has the filetype information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update the repr? |
||
analysis_result | ||
) | ||
derived_data_file_node.file_uuid = ( | ||
analysis_result.file_store_uuid | ||
) | ||
logger.debug( | ||
"Output file %s ('%s') assigned to node %s ('%s')", | ||
output_connection, | ||
analysis_result.file_store_uuid, | ||
derived_data_file_node.name, | ||
derived_data_file_node.uuid | ||
) | ||
output_connection.node = derived_data_file_node | ||
output_connection.save() | ||
|
||
self._link_derived_data_file_node_to_data_transformation_node( | ||
graph, | ||
output_connection, | ||
derived_data_file_node | ||
) | ||
|
||
def _link_derived_data_file_node_to_data_transformation_node( | ||
self, | ||
graph, | ||
output_connection, | ||
derived_data_file_node | ||
): | ||
"""get graph edge that corresponds to this output node: | ||
a. attach output node to source data transformation node | ||
b. attach output node to target data transformation node | ||
(if exists)""" | ||
if len(graph.edges([output_connection.step])) > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but if it were zero, then the loop below would just exit quickly? |
||
for edge in graph.edges_iter([output_connection.step]): | ||
output_id = output_connection.get_output_connection_id() | ||
|
||
if graph[edge[0]][edge[1]]['output_id'] == output_id: | ||
input_node_id = edge[0] | ||
output_node_id = edge[1] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a lot like line 1681? maybe define the variables first? |
||
|
||
data_transformation_input_node = ( | ||
graph.node[input_node_id]['node'] | ||
) | ||
data_transformation_output_node = ( | ||
graph.node[output_node_id]['node'] | ||
) | ||
data_transformation_input_node.add_child( | ||
derived_data_file_node | ||
) | ||
derived_data_file_node.add_child( | ||
data_transformation_output_node | ||
) | ||
# TODO: here we could add a (Refinery internal) | ||
# attribute to the derived data file node to | ||
# indicate which output of the tool it corresponds to | ||
|
||
# connect outputs that are not inputs for any data transformation | ||
if (output_connection.is_refinery_file and | ||
derived_data_file_node.parents.count() == 0): | ||
graph.node[output_connection.step]['node'].add_child( | ||
derived_data_file_node | ||
) | ||
# delete output nodes that are not refinery files and don't have | ||
# any children | ||
if (not output_connection.is_refinery_file and | ||
derived_data_file_node.children.count() == 0): | ||
output_connection.node.delete() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe: if derived_data_file_node.children.count() == 0:
if output_connection.is_refinery_file:
...
else:
... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we could make the split here too, but looking at theses two ifs one is worried about |
||
|
||
def _create_annotated_nodes(self): | ||
"""create and index annotated nodes""" | ||
node_uuids = AnalysisNodeConnection.objects.filter( | ||
analysis=self, | ||
direction=OUTPUT_CONNECTION, | ||
is_refinery_file=True | ||
).values_list('node__uuid', flat=True) | ||
|
||
add_annotated_nodes_selection( | ||
node_uuids, | ||
Node.DERIVED_DATA_FILE, | ||
self.get_input_node_study().uuid, | ||
self.get_input_node_assay().uuid | ||
) | ||
self._prepare_annotated_nodes(node_uuids) | ||
|
||
|
||
#: Defining available relationship types | ||
INPUT_CONNECTION = 'in' | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of making this a property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No benefit, changing it now