Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scottx611x/fix same name analysis results #2099

Merged
merged 29 commits into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
85a06cd
Code tidying
scottx611x Sep 6, 2017
7a68b85
Associate AnalysisNodeConnection outputs with AnalysisResults regardl…
scottx611x Sep 6, 2017
ebcc590
Utilize some constants
scottx611x Sep 6, 2017
ffe265b
Code tidying
scottx611x Sep 6, 2017
07df5cb
Add test coverage for input AnalysisNodeConnection generation and `An…
scottx611x Sep 6, 2017
f1e68f9
Add `GALAXY_INPUT_TYPES` constant
scottx611x Sep 6, 2017
4cc610e
Fix tests for `Analysis.attach_outputs_dataset`
scottx611x Sep 7, 2017
70bad72
It is possible to have no AnalysisResults for an AnalysisNodeConnecti…
scottx611x Sep 7, 2017
6d8bdf8
Make some stronger assertions about our AnalysisNodeConnection output…
scottx611x Sep 7, 2017
eda7468
Code tidying
scottx611x Sep 7, 2017
9125461
More tidying
scottx611x Sep 7, 2017
a2a087f
I think it may be clearer to have the inline if/else in its own method
scottx611x Sep 7, 2017
f50723e
Nodes can share the same `file_uuid`, preserve uniqueness in the `get…
scottx611x Sep 7, 2017
dfe1eb4
Pin down logic for associating the proper AnalysisResult with its Ana…
scottx611x Sep 7, 2017
0116f79
Utilize UUID_RE constant
scottx611x Sep 7, 2017
77762ab
No need for two `if`s here
scottx611x Sep 7, 2017
d5361b3
Code cleanup
scottx611x Sep 8, 2017
f9d0a36
Add test coverage for edge case where many derived Galaxy Datasets ca…
scottx611x Sep 8, 2017
61bfd04
Don't start these mocks unless we actually need them since they refer…
scottx611x Sep 8, 2017
fcaafc1
Explictitly define galaxy mock data with the same filenames
scottx611x Sep 8, 2017
531542b
Update tests
scottx611x Sep 8, 2017
8f73c10
Add logger statement
scottx611x Sep 8, 2017
0da22a2
Add stronger assertions about the many AnalysisResults w/ same name e…
scottx611x Sep 8, 2017
6c2c0fc
Simplyfy association of AnalysisNodeConnections to AnalysisResults
scottx611x Sep 11, 2017
ed6188d
Update test
scottx611x Sep 11, 2017
cc3f4f2
Don't need an ordered dict here
scottx611x Sep 11, 2017
8f926ab
Add test coverage for `Analysis._get_output_connection_to_analysis_re…
scottx611x Sep 11, 2017
607de7b
Swap creation of AnalysisResults to make final test structure we asse…
scottx611x Sep 11, 2017
f944f5c
Utilize default dict and use tuples for the AnalysisNodeConnection to…
scottx611x Sep 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 83 additions & 51 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import absolute_import

import ast
from collections import defaultdict
import copy
from datetime import datetime
import json
Expand Down Expand Up @@ -1655,24 +1656,31 @@ def attach_outputs_dataset(self):
analysis=self, direction=INPUT_CONNECTION)[0].node.assay
# 1. read workflow into graph
graph = create_expanded_workflow_graph(
ast.literal_eval(self.workflow_copy))
ast.literal_eval(self.workflow_copy)
)
# 2. create data transformation nodes for all tool nodes
data_transformation_nodes = [graph.node[node_id]
for node_id in graph.nodes()
if graph.node[node_id]['type'] == "tool"]
for data_transformation_node in data_transformation_nodes:
# TODO: incorporate subanalysis id in tool name???
data_transformation_node['node'] = \
data_transformation_node['node'] = (
Node.objects.create(
study=study, assay=assay, analysis_uuid=self.uuid,
study=study,
assay=assay,
analysis_uuid=self.uuid,
type=Node.DATA_TRANSFORMATION,
name=data_transformation_node['tool_id'] + '_' +
data_transformation_node['name']
)
)
# 3. create connection from input nodes to first data transformation
# nodes (input tool nodes in the graph are skipped)
for input_connection in AnalysisNodeConnection.objects.filter(
analysis=self, direction=INPUT_CONNECTION):
input_node_connections = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=INPUT_CONNECTION
)
for input_connection in input_node_connections:
for edge in graph.edges_iter([input_connection.step]):
if (graph[edge[0]][edge[1]]['output_id'] ==
str(input_connection.step) + '_' +
Expand All @@ -1683,14 +1691,17 @@ def attach_outputs_dataset(self):
input_connection.node.add_child(data_transformation_node)
# 4. create derived data file nodes for all entries and connect to data
# transformation nodes
for output_connection in AnalysisNodeConnection.objects.filter(
analysis=self,
direction=OUTPUT_CONNECTION
):
output_connection_to_analysis_result_mapping = (
self._get_output_connection_to_analysis_result_mapping()
)
output_mappings = output_connection_to_analysis_result_mapping

for output_connection, analysis_result in output_mappings:
# create derived data file node
derived_data_file_node = (
Node.objects.create(
study=study, assay=assay,
study=study,
assay=assay,
type=Node.DERIVED_DATA_FILE,
name=output_connection.name,
analysis_uuid=self.uuid,
Expand All @@ -1700,38 +1711,16 @@ def attach_outputs_dataset(self):
)
# retrieve uuid of corresponding output file if exists
logger.info("Results for '%s' and %s.%s: %s",
self.uuid,
output_connection.filename, output_connection.filetype,
str(AnalysisResult.objects.filter(
analysis_uuid=self.uuid,
file_name=(output_connection.name + "." +
output_connection.filetype)).count()))
analysis_results = AnalysisResult.objects.filter(
analysis_uuid=self.uuid,
file_name=(output_connection.name + "." +
output_connection.filetype))

if analysis_results.count() == 0:
logger.info("No output file found for node '%s' ('%s')",
derived_data_file_node.name,
derived_data_file_node.uuid)

if analysis_results.count() == 1:
derived_data_file_node.file_uuid = \
analysis_results[0].file_store_uuid
logger.debug(
"Output file %s.%s ('%s') assigned to node %s ('%s')",
output_connection.name,
output_connection.filetype,
analysis_results[0].file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid)

if analysis_results.count() > 1:
logger.warning("Multiple output files returned for '%s.%s'." +
"No assignment to output node was made.",
output_connection.filename,
output_connection.filetype)
self.uuid, output_connection,
output_connection.filetype, analysis_result)

derived_data_file_node.file_uuid = analysis_result.file_store_uuid

logger.debug("Output file %s ('%s') assigned to node %s ('%s')",
output_connection, analysis_result.file_store_uuid,
derived_data_file_node.name,
derived_data_file_node.uuid)

output_connection.node = derived_data_file_node
output_connection.save()
# get graph edge that corresponds to this output node:
Expand All @@ -1740,22 +1729,28 @@ def attach_outputs_dataset(self):
# (if exists)
if len(graph.edges([output_connection.step])) > 0:
for edge in graph.edges_iter([output_connection.step]):
if (graph[edge[0]][edge[1]]['output_id'] ==
str(output_connection.step) + "_" +
output_connection.filename):
output_id = "{}_{}".format(
output_connection.step,
output_connection.filename
)
if graph[edge[0]][edge[1]]['output_id'] == output_id:
output_node_id = edge[0]
input_node_id = edge[1]
data_transformation_output_node = \
data_transformation_output_node = (
graph.node[output_node_id]['node']
data_transformation_input_node = \
)
data_transformation_input_node = (
graph.node[input_node_id]['node']
)
data_transformation_output_node.add_child(
derived_data_file_node)
derived_data_file_node
)
derived_data_file_node.add_child(
data_transformation_input_node)
data_transformation_input_node
)
# TODO: here we could add a (Refinery internal)
# attribute to the derived data file node to indicate
# which output of the tool it corresponds to
# attribute to the derived data file node to
# indicate which output of the tool it corresponds to
# connect outputs that are not inputs for any data transformation
if (output_connection.is_refinery_file and
derived_data_file_node.parents.count() == 0):
Expand Down Expand Up @@ -1839,6 +1834,43 @@ def _prepare_annotated_nodes(self, node_uuids):
self.rename_results()
index_annotated_nodes_selection(node_uuids)

def _get_output_connection_to_analysis_result_mapping(self):
"""
Create and return a dict mapping each "output" type
AnalysisNodeConnection to it's respective analysis result.

This is especially useful when we run into the edge-case described
here: https://github.com/
refinery-platform/refinery-platform/pull/2099#issue-255989396
"""
distinct_filenames_map = defaultdict(lambda: [])
output_connections_to_analysis_results = []

output_node_connections = AnalysisNodeConnection.objects.filter(
analysis=self,
direction=OUTPUT_CONNECTION
)
# Fetch the distinct file names from our output
# AnalysisNodeConnections for this Analysis construct a dict
# mapping the unique file names to a list of AnalysisNodeConnections
# sharing said filename.
for output_connection in output_node_connections:
distinct_filenames_map[output_connection.filename].append(
output_connection
)
# Associate the AnalysisNodeConnections with their respective
# AnalysisResults
for output_connections in distinct_filenames_map.values():
for index, output_connection in enumerate(output_connections):
analysis_result = AnalysisResult.objects.filter(
analysis_uuid=self.uuid,
file_name=output_connection.filename
)[index]
output_connections_to_analysis_results.append(
(output_connection, analysis_result)
)
return output_connections_to_analysis_results

@property
def is_tool_based(self):
try:
Expand Down
74 changes: 62 additions & 12 deletions refinery/core/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from analysis_manager.models import AnalysisStatus
from data_set_manager.models import Assay, Contact, Investigation, Node, Study
from factory_boy.utils import create_dataset_with_necessary_models
from file_store.models import FileStoreItem
from file_store.models import FileStoreItem, FileType
from galaxy_connector.models import Instance

from .api import AnalysisResource
from .management.commands.create_user import init_user
from .models import (Analysis, AnalysisNodeConnection, DataSet, ExtendedGroup,
InvestigationLink, NodeSet, Project, Tutorials,
UserProfile, Workflow, WorkflowDataInputMap,
from .models import (INPUT_CONNECTION, OUTPUT_CONNECTION, Analysis,
AnalysisNodeConnection, AnalysisResult, DataSet,
ExtendedGroup, InvestigationLink, NodeSet, Project,
Tutorials, UserProfile, Workflow, WorkflowDataInputMap,
WorkflowEngine, create_nodeset, delete_nodeset,
get_nodeset, invalidate_cached_object, update_nodeset)
from .search_indexes import DataSetIndex
Expand Down Expand Up @@ -1347,12 +1348,15 @@ def setUp(self):
self.workflow1 = Workflow.objects.create(
name="Workflow1", workflow_engine=self.workflow_engine)

text_filetype = FileType.objects.get(name="TXT")

# Create FileStoreItems
self.file_store_item = FileStoreItem.objects.create(
datafile=SimpleUploadedFile(
'test_file.txt',
'Coffee is delicious!'
)
),
filetype=text_filetype
)
self.file_store_item1 = FileStoreItem.objects.create(
datafile=SimpleUploadedFile(
Expand Down Expand Up @@ -1412,6 +1416,7 @@ def setUp(self):
self.node = Node.objects.create(
assay=self.assay,
study=self.study,
name="test_node",
analysis_uuid=self.analysis.uuid,
file_uuid=self.file_store_item.uuid
)
Expand All @@ -1431,17 +1436,38 @@ def setUp(self):
workflow_data_input_name="input 2",
data_uuid=self.node2.uuid
)
self.node_filename = "{}.{}".format(
self.node.name,
self.node.get_file_store_item().get_file_extension()
)

# Create AnalysisNodeConnections
self.analysis_node_connection = \
AnalysisNodeConnection.objects.create(analysis=self.analysis,
node=self.node, step=1,
direction="out")
self.analysis_node_connection_with_node_analyzed_further = \
self.analysis_node_connection_a = (
AnalysisNodeConnection.objects.create(
analysis=self.analysis,
node=self.node,
step=1,
filename=self.node_filename,
direction=OUTPUT_CONNECTION
)
)
self.analysis_node_connection_b = (
AnalysisNodeConnection.objects.create(
analysis=self.analysis,
node=self.node,
step=1,
filename=self.node_filename,
direction=OUTPUT_CONNECTION
)
)
self.analysis_node_connection_with_node_analyzed_further = (
AnalysisNodeConnection.objects.create(
analysis=self.analysis_with_node_analyzed_further,
node=self.node2, step=2,
direction="in")
node=self.node2,
step=0,
direction=INPUT_CONNECTION
)
)

# Add wf_data_input_maps to Analysis M2M relationship
self.analysis.workflow_data_input_maps.add(self.wf_data_input_map,
Expand Down Expand Up @@ -1534,6 +1560,30 @@ def test__prepare_annotated_nodes_calls_methods_in_proper_order(
]
)

def test___get_output_connection_to_analysis_result_mapping(self):
analysis_result_b = AnalysisResult.objects.create(
analysis_uuid=self.analysis.uuid,
file_store_uuid=self.node.file_uuid,
file_name=self.node_filename,
file_type=self.node.get_file_store_item().filetype
)
analysis_result_a = AnalysisResult.objects.create(
analysis_uuid=self.analysis.uuid,
file_store_uuid=self.node.file_uuid,
file_name=self.node_filename,
file_type=self.node.get_file_store_item().filetype
)
output_mapping = (
self.analysis._get_output_connection_to_analysis_result_mapping()
)
self.assertEqual(
output_mapping,
[
(self.analysis_node_connection_b, analysis_result_b),
(self.analysis_node_connection_a, analysis_result_a)
]
)


class UtilitiesTest(TestCase):
def setUp(self):
Expand Down
42 changes: 22 additions & 20 deletions refinery/galaxy_connector/galaxy_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,7 @@ def configure_workflow(workflow_dict, ret_list):
def create_expanded_workflow_graph(dictionary):
graph = nx.MultiDiGraph()
steps = dictionary["steps"]
galaxy_input_types = [
'data_input',
tool_manager.models.WorkflowTool.DATA_COLLECTION_INPUT
]
galaxy_input_types = tool_manager.models.WorkflowTool.GALAXY_INPUT_TYPES

# iterate over steps to create nodes
for current_node_id, step in steps.iteritems():
Expand All @@ -542,20 +539,23 @@ def create_expanded_workflow_graph(dictionary):
# create node
graph.add_node(current_node_id)
# add node attributes
graph.node[current_node_id]['name'] = \
str(current_node_id) + ": " + step['name']
graph.node[current_node_id]['name'] = "{}:{}".format(
current_node_id,
step['name']
)
graph.node[current_node_id]['tool_id'] = step['tool_id']
graph.node[current_node_id]['type'] = step['type']
graph.node[current_node_id]['position'] = (
int(step['position']['left']), -int(step['position']['top']))
int(step['position']['left']), -int(step['position']['top'])
)
graph.node[current_node_id]['node'] = None
# iterate over steps to create edges (this is done by looking at
# input_connections, i.e. only by looking at tool nodes)
for current_node_id, step in steps.iteritems():
# ensure node id is an integer
current_node_id = int(current_node_id)
for current_node_input_name, input_connection in \
step['input_connections'].iteritems():
input_connections = step['input_connections'].iteritems()
for current_node_input_name, input_connection in input_connections:
parent_node_id = input_connection["id"]
# test if parent node is a tool node or an input node to pick the
# right name for the outgoing edge
Expand All @@ -565,16 +565,18 @@ def create_expanded_workflow_graph(dictionary):
)
else:
parent_node_output_name = input_connection['output_name']

edge_output_id = str(
parent_node_id) + '_' + parent_node_output_name
edge_input_id = str(
current_node_id) + '_' + current_node_input_name
edge_id = edge_output_id + '___' + edge_input_id
edge_output_id = "{}_{}".format(
parent_node_id,
parent_node_output_name
)
edge_input_id = "{}_{}".format(
current_node_id,
current_node_input_name
)
edge_id = "{}___{}".format(edge_output_id, edge_input_id)
graph.add_edge(parent_node_id, current_node_id, key=edge_id)
graph[parent_node_id][current_node_id]['output_id'] = str(
parent_node_id) + '_' + parent_node_output_name
graph[parent_node_id][current_node_id]['input_id'] = str(
current_node_id) + '_' + current_node_input_name

graph[parent_node_id][current_node_id]['output_id'] = (
edge_output_id
)
graph[parent_node_id][current_node_id]['input_id'] = edge_input_id
return graph