Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure Analysis Metadata Fetching for Celery #3461

Merged
merged 36 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
56acbe7
cut down api requests by passing in result
ilan-gold Oct 4, 2019
c22bd0f
import generate_auxiliary_file to circumvent potential import issue
ilan-gold Oct 4, 2019
0b20f00
remove reference to dsm.tasks
ilan-gold Oct 4, 2019
edb457a
store a tool's response information in a django field for repeated use
ilan-gold Oct 7, 2019
64cfb41
re-format if statement
ilan-gold Oct 7, 2019
e87fa27
allow request to be forced through instead of loading from database
ilan-gold Oct 8, 2019
7d4dce9
use ast.literal_eval for lists instead of json.loads
ilan-gold Oct 8, 2019
547213d
further cache response to invocation call and refactor other cache
ilan-gold Oct 8, 2019
fd824af
reformat and delete unused imports
ilan-gold Oct 8, 2019
a895dad
revert data_set_manager import hack
ilan-gold Oct 9, 2019
c8ec629
bump up the soft_time_limit
ilan-gold Oct 9, 2019
7761fe8
initial commit for restructuring galaxy requests
ilan-gold Oct 15, 2019
4022206
get dataset download list more directly from exposed_datasets
ilan-gold Oct 15, 2019
f10ad15
retrieve analysis download list from the AnalysisNodeConnection list
ilan-gold Oct 15, 2019
aa40889
update download_list to use AnalysisNodeConnection outputs
ilan-gold Oct 15, 2019
f7df2ec
remove cache fields on Tools model and re-use full galaxy dataset list
ilan-gold Oct 15, 2019
37b5c1c
import OUTPUT_CONNECTION
ilan-gold Oct 15, 2019
4523a82
remove force_request parameter
ilan-gold Oct 15, 2019
fbc5952
fix formatting on exposed_dataset_list paramter
ilan-gold Oct 15, 2019
c3f56ba
fix formatting
ilan-gold Oct 15, 2019
25403c9
add creating_job option to create output name to prevent repeat requests
ilan-gold Oct 16, 2019
2786cfa
revert use of rename s3 as separate task
ilan-gold Oct 16, 2019
62094f4
remove RenameS3FileTask task
ilan-gold Oct 16, 2019
2814e62
revert fabricrc.sample file changes
ilan-gold Oct 16, 2019
6a3ae90
fix tool_manager mocking to match new behavior
ilan-gold Oct 16, 2019
4df07bd
fix formatting on test
ilan-gold Oct 16, 2019
9482624
resolve ilya's requests
ilan-gold Oct 21, 2019
d3848d9
update field names
ilan-gold Oct 21, 2019
b056bae
update file extension in name
ilan-gold Oct 21, 2019
5dbd46b
remove check on invocation being None
ilan-gold Oct 21, 2019
f34312b
remove unneeded fields and fix tests
ilan-gold Oct 22, 2019
204dca4
reinsert "galaxy_dataset_name" to AnalysisNodeConnection
ilan-gold Oct 22, 2019
0a43ab5
remove unnecessary get_galaxy_dataset_download_list function and tests
ilan-gold Oct 22, 2019
1118741
handle galaxy error
ilan-gold Oct 22, 2019
4553883
update use of json.loads() and json.dumps() to be liter_eval with dict
ilan-gold Oct 22, 2019
63106a0
change back to ast.literal_eval from json.loads
ilan-gold Oct 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 11 additions & 7 deletions refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from celery.task.sets import TaskSet

import core
from core.models import Analysis, AnalysisResult, Workflow
from core.models import Analysis, AnalysisResult, Workflow, OUTPUT_CONNECTION
from file_store.models import FileStoreItem, FileExtension
from file_store.tasks import FileImportTask
import tool_manager
Expand Down Expand Up @@ -473,7 +473,11 @@ def _get_galaxy_download_task_ids(analysis):
tool.create_analysis_output_node_connections()
galaxy_instance = analysis.workflow.workflow_engine.instance
try:
download_list = tool.get_galaxy_dataset_download_list()
download_list = tool_manager.models.AnalysisNodeConnection.\
objects.filter(
is_refinery_file=True, analysis=analysis,
direction=OUTPUT_CONNECTION
)
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
except galaxy.client.ConnectionError as exc:
error_msg = \
"Error downloading Galaxy history files for analysis '%s': %s"
Expand All @@ -485,14 +489,14 @@ def _get_galaxy_download_task_ids(analysis):
# Iterating through files in current galaxy history
for results in download_list:
# download file if result state is "ok"
if results['state'] == 'ok':
file_extension = results["type"]
result_name = "{}.{}".format(results['name'], file_extension)
if results.state == 'ok':
file_extension = results.filetype
result_name = "{}".format(results.filename)
# size of file defined by galaxy
file_size = results['file_size']
file_size = results.file_size
file_store_item = FileStoreItem(source=urlparse.urljoin(
galaxy_instance.base_url,
"datasets/{}/display?to_ext=txt".format(results['dataset_id'])
"datasets/{}/display?to_ext=txt".format(results.dataset_id)
))
# workaround to set the correct file type for zip archives of
# FastQC HTML reports produced by Galaxy dynamically
Expand Down
2 changes: 1 addition & 1 deletion refinery/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def get_setting(name, settings=local_settings, default=None):
CELERYD_MAX_TASKS_PER_CHILD = get_setting("CELERYD_MAX_TASKS_PER_CHILD")
CELERY_ROUTES = {"file_store.tasks.FileImportTask": {"queue": "file_import"}}
CELERY_ACCEPT_CONTENT = ['pickle']
CELERYD_TASK_SOFT_TIME_LIMIT = 60 # seconds
CELERYD_TASK_SOFT_TIME_LIMIT = 180 # seconds
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
CELERYBEAT_SCHEDULE = {
'collect_site_statistics': {
'task': 'core.tasks.collect_site_statistics',
Expand Down
29 changes: 29 additions & 0 deletions refinery/core/migrations/0038_auto_20191015_1423.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0037_remove_analysisresult_analysis_uuid'),
]

operations = [
migrations.AddField(
model_name='analysisnodeconnection',
name='dataset_id',
field=models.CharField(max_length=100, null=True, blank=True),
),
migrations.AddField(
model_name='analysisnodeconnection',
name='file_size',
field=models.IntegerField(null=True, blank=True),
),
migrations.AddField(
model_name='analysisnodeconnection',
name='state',
field=models.CharField(max_length=100, null=True, blank=True),
),
]
9 changes: 7 additions & 2 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
'''
from __future__ import absolute_import

import ast
from collections import defaultdict
from datetime import datetime
import json
Expand Down Expand Up @@ -1067,7 +1066,7 @@ def __str__(self):

def get_expanded_workflow_graph(self):
return tool_manager.utils.create_expanded_workflow_graph(
ast.literal_eval(self.workflow_copy)
json.loads(self.workflow_copy)
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
)

def has_nodes_used_in_downstream_analyses(self):
Expand Down Expand Up @@ -1730,6 +1729,12 @@ class AnalysisNodeConnection(models.Model):
default=False)
galaxy_dataset_name = models.CharField(null=True, blank=True,
max_length=250)
# state from galaxy
state = models.CharField(null=True, blank=True, max_length=100)
# size of the file
file_size = models.IntegerField(null=True, blank=True)
# galaxy dataset id
dataset_id = models.CharField(null=True, blank=True, max_length=100)
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved

def __unicode__(self):
return "{}: {}_{} ({}) {}".format(
Expand Down
19 changes: 19 additions & 0 deletions refinery/tool_manager/migrations/0030_auto_20191007_0941.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tool_manager', '0029_parameter_uuid'),
]

operations = [
migrations.AddField(
model_name='workflowtool',
name='invocation',
field=models.TextField(null=True, blank=True),
)
]
66 changes: 49 additions & 17 deletions refinery/tool_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ class WorkflowTool(Tool):
REVERSE = "reverse"
TOOL_ID = "tool_id"
WORKFLOW_OUTPUTS = "workflow_outputs"
invocation = models.TextField(null=True, blank=True)
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved

class Meta:
verbose_name = "workflowtool"
Expand Down Expand Up @@ -769,7 +770,7 @@ def _create_analysis(self):
self.set_analysis(analysis.uuid)

workflow_dict = self._get_workflow_dict()
self.analysis.workflow_copy = workflow_dict
self.analysis.workflow_copy = json.dumps(workflow_dict)
self.analysis.workflow_steps_num = len(workflow_dict["steps"].keys())
self.analysis.set_owner(self.get_owner())
self.analysis.save()
Expand All @@ -793,8 +794,11 @@ def create_analysis_output_node_connections(self):
"""Create the AnalysisNodeConnection objects corresponding to the
output Nodes (Derived Data) of a WorkflowTool launch
"""
exposed_workflow_outputs = self._get_exposed_galaxy_datasets()
for galaxy_dataset in self._get_galaxy_history_dataset_list():
exposed_dataset_list = self._get_galaxy_history_dataset_list()
exposed_workflow_outputs = self._get_exposed_galaxy_datasets(
exposed_dataset_list=exposed_dataset_list
)
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
for galaxy_dataset in exposed_dataset_list:
AnalysisNodeConnection.objects.create(
analysis=self.analysis, direction=OUTPUT_CONNECTION,
name=self._get_creating_job_output_name(galaxy_dataset),
Expand All @@ -803,7 +807,10 @@ def create_analysis_output_node_connections(self):
filename=self._get_galaxy_dataset_filename(galaxy_dataset),
filetype=galaxy_dataset["file_ext"],
is_refinery_file=galaxy_dataset in exposed_workflow_outputs,
galaxy_dataset_name=galaxy_dataset["name"]
galaxy_dataset_name=galaxy_dataset["name"],
state=galaxy_dataset['state'],
file_size=galaxy_dataset['file_size'],
dataset_id=galaxy_dataset['id']
)

def _create_collection_description(self):
Expand Down Expand Up @@ -1095,7 +1102,7 @@ def _get_galaxy_history_dataset_list(self):
]
return retained_datasets

def _get_exposed_galaxy_datasets(self):
def _get_exposed_galaxy_datasets(self, exposed_dataset_list=None):
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
"""
Retrieve all Galaxy Datasets that correspond to an asterisked
output in the Galaxy workflow editor.
Expand All @@ -1108,7 +1115,9 @@ def _get_exposed_galaxy_datasets(self):
explicitly exposed
"""
exposed_galaxy_datasets = []
for galaxy_dataset in self._get_galaxy_history_dataset_list():
if exposed_dataset_list is None:
exposed_dataset_list = self._get_galaxy_history_dataset_list()
for galaxy_dataset in exposed_dataset_list:
creating_job = self._get_galaxy_dataset_job(galaxy_dataset)

# `tool_id` corresponds to the descriptive name of a galaxy
Expand All @@ -1119,7 +1128,9 @@ def _get_exposed_galaxy_datasets(self):
)
workflow_steps_dict = self._get_workflow_dict()["steps"]
creating_job_output_name = (
self._get_creating_job_output_name(galaxy_dataset)
self._get_creating_job_output_name(
galaxy_dataset, creating_job
)
)
workflow_step_output_names = [
workflow_output["output_name"] for workflow_output in
Expand Down Expand Up @@ -1164,10 +1175,19 @@ def _get_galaxy_workflow_invocation(self):
"""
Fetch our Galaxy Workflow's invocation data.
"""
return self.galaxy_connection.workflows.show_invocation(
self.galaxy_workflow_history_id,
self.get_galaxy_dict()[self.GALAXY_WORKFLOW_INVOCATION_DATA]["id"]
)
# separate if-then assignment needed to avoid using the dict stored
# in self.invocation before .save() is called
if self.invocation == '' or self.invocation is None:
ilan-gold marked this conversation as resolved.
Show resolved Hide resolved
invocation = self.galaxy_connection.workflows.show_invocation(
self.galaxy_workflow_history_id,
self.get_galaxy_dict()
[self.GALAXY_WORKFLOW_INVOCATION_DATA]["id"]
)
self.invocation = json.dumps(invocation)
self.save()
else:
invocation = json.loads(self.invocation)
return invocation

@handle_bioblend_exceptions
def _get_refinery_input_file_id(self, galaxy_dataset_dict):
Expand Down Expand Up @@ -1248,9 +1268,19 @@ def _get_tool_inputs_dict(self, workflow_step):

@handle_bioblend_exceptions
def _get_workflow_dict(self):
return self.galaxy_connection.workflows.export_workflow_dict(
self.get_workflow_internal_id()
)
# separate if-then assignment needed to avoid using the dict stored
# in workflow_copy before .save() is called
if self.analysis.workflow_copy == '' \
or self.analysis.workflow_copy is None:
workflow_copy = \
self.galaxy_connection.workflows.export_workflow_dict(
self.get_workflow_internal_id()
)
self.analysis.workflow_copy = json.dumps(workflow_copy)
self.analysis.save()
else:
workflow_copy = json.loads(self.analysis.workflow_copy)
return workflow_copy

def get_workflow_internal_id(self):
return self.tool_definition.workflow.internal_id
Expand All @@ -1265,19 +1295,21 @@ def _get_workflow_step(self, galaxy_dataset_dict):
# `input` step i.e. `0`
return self.INPUT_STEP_NUMBER

def _get_creating_job_output_name(self, galaxy_dataset_dict):
def _get_creating_job_output_name(self, galaxy_dataset_dict,
creating_job=None):
"""
Retrieve the specified output name from the creating Galaxy Job that
corresponds to a Galaxy Dataset
:param galaxy_dataset_dict: dict containing information about a
Galaxy Dataset.

:param creating_job: an optional argument to prevent repeat request
This is useful if there are any post-job-actions in place to do
renaming of said output dataset.

:return: The proper output name of our galaxy dataset
"""
creating_job = self._get_galaxy_dataset_job(galaxy_dataset_dict)
if creating_job is None:
creating_job = self._get_galaxy_dataset_job(galaxy_dataset_dict)
creating_job_outputs = creating_job["outputs"]
workflow_step_output_name = [
output_name for output_name in creating_job_outputs.keys()
Expand Down