-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix .bam file handling for visualizations #3484
Changes from 34 commits
0a7ed9c
3c4068e
43db45e
6e8fc3b
28f2c52
bff4862
7a76172
08ae2c6
bedc1f7
312bc8d
529c65a
4999476
f680b4c
82dca41
48e0137
38b6afe
2819048
482cca7
4d18e92
fe17dbc
8dbfa6f
f92a055
5e0f73e
26cbda8
f3b192e
515575f
f374ffc
7ca8a6d
acff06d
06729db
f85df5e
21fb8fd
b1c9706
12df5f5
ba0dfe1
77ebce6
4c31562
4ae5da8
c77d4f4
32e6ca6
6af7517
084449f
023de8d
f38b064
b9b4b02
a762c93
60c1ee8
41b0f6d
74a3821
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import unicode_literals | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('analysis_manager', '0008_analysisstatus_galaxy_workflow_task_group_id'), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name='analysisstatus', | ||
name='auxiliary_file_task_group_id', | ||
field=models.UUIDField(null=True, editable=False), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,9 +146,48 @@ def _attach_workflow_outputs(analysis_uuid): | |
""" | ||
analysis = _get_analysis(analysis_uuid) | ||
analysis_status = _get_analysis_status(analysis_uuid) | ||
|
||
if analysis.workflow.type == Workflow.ANALYSIS_TYPE: | ||
analysis.attach_derived_nodes_to_dataset() | ||
if not analysis_status.auxiliary_file_task_group_id: | ||
auxiliary_file_tasks_signatures = \ | ||
analysis.attach_derived_nodes_to_dataset() | ||
logger.info( | ||
"Starting auxiliary file generation and import for analysis " | ||
"'%s'", analysis) | ||
auxiliary_file_tasks = TaskSet( | ||
tasks=auxiliary_file_tasks_signatures | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
).apply_async() | ||
auxiliary_file_tasks.save() | ||
analysis_status.auxiliary_file_task_group_id = ( | ||
auxiliary_file_tasks.taskset_id | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. analysis_status.auxiliary_file_task_group_id = \
auxiliary_file_tasks.taskset_id |
||
analysis_status.save() | ||
run_analysis.retry(countdown=RETRY_INTERVAL) | ||
# check if analysis results have finished downloading from Galaxy | ||
auxiliary_file_tasks = get_taskset_result( | ||
analysis_status.auxiliary_file_task_group_id | ||
) | ||
if not auxiliary_file_tasks.ready(): | ||
logger.debug("Auxiliary file import and generation " | ||
"running for analysis '%s'", analysis) | ||
run_analysis.retry(countdown=RETRY_INTERVAL) | ||
elif not auxiliary_file_tasks.successful(): | ||
error_msg = ("Analysis '{}' failed while generating " | ||
"auxiliary file".format(analysis)) | ||
logger.error(error_msg) | ||
analysis.set_status(Analysis.FAILURE_STATUS, error_msg) | ||
analysis.send_email() | ||
|
||
get_taskset_result( | ||
analysis_status.refinery_import_task_group_id | ||
).delete() | ||
get_taskset_result( | ||
analysis_status.galaxy_import_task_group_id | ||
).delete() | ||
get_taskset_result( | ||
analysis_status.galaxy_export_task_group_id | ||
).delete() | ||
auxiliary_file_tasks.delete() | ||
analysis.galaxy_cleanup() | ||
elif analysis.workflow.type == Workflow.DOWNLOAD_TYPE: | ||
analysis.attach_outputs_downloads() | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1358,7 +1358,8 @@ def attach_derived_nodes_to_dataset(self): | |
) | ||
) | ||
self._create_derived_data_file_nodes(graph_with_input_nodes_linked) | ||
self._create_annotated_nodes() | ||
auxiliary_file_import_tasks = self._create_annotated_nodes() | ||
return auxiliary_file_import_tasks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
def attach_outputs_downloads(self): | ||
if self.results.all().count() == 0: | ||
|
@@ -1409,6 +1410,7 @@ def _prepare_annotated_nodes(self, node_uuids): | |
Call order is ensured through: | ||
core.tests.test__prepare_annotated_nodes_calls_methods_in_proper_order | ||
""" | ||
auxiliary_file_tasks = [] | ||
for result in self.results.all(): | ||
try: | ||
item = FileStoreItem.objects.get(uuid=result.file_store_uuid) | ||
|
@@ -1424,8 +1426,11 @@ def _prepare_annotated_nodes(self, node_uuids): | |
item.uuid, exc) | ||
else: | ||
if node.is_derived(): | ||
node.run_generate_auxiliary_node_task() | ||
subtask = node.run_generate_auxiliary_node_task() | ||
if subtask is not None: | ||
auxiliary_file_tasks += [subtask] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be |
||
index_annotated_nodes_selection(node_uuids) | ||
return auxiliary_file_tasks | ||
|
||
def _get_output_connection_to_analysis_result_mapping(self): | ||
"""Create and return a dict mapping each "output" type | ||
|
@@ -1627,7 +1632,8 @@ def _create_annotated_nodes(self): | |
self.get_input_node_study().uuid, | ||
self.get_input_node_assay().uuid | ||
) | ||
self._prepare_annotated_nodes(node_uuids) | ||
auxiliary_file_tasks = self._prepare_annotated_nodes(node_uuids) | ||
return auxiliary_file_tasks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
def get_refinery_import_task_signatures(self): | ||
"""Create and return a list of file import task signatures for the | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
from django.dispatch import receiver | ||
|
||
from celery.result import AsyncResult | ||
from celery import chain | ||
from django_extensions.db.fields import UUIDField | ||
import requests | ||
from requests.exceptions import HTTPError | ||
|
@@ -22,7 +23,7 @@ | |
from core.utils import delete_analysis_index, skip_if_test_run | ||
import data_set_manager | ||
from file_store.models import FileStoreItem | ||
|
||
from file_store.tasks import FileImportTask | ||
""" | ||
TODO: Refactor import data_set_manager. Importing | ||
data_set_manager.tasks.generate_auxiliary_file() | ||
|
@@ -497,6 +498,9 @@ class Node(models.Model): | |
TYPES = ASSAYS | FILES | { | ||
SOURCE, SAMPLE, EXTRACT, LABELED_EXTRACT, SCAN, NORMALIZATION, | ||
DATA_TRANSFORMATION} | ||
# Currently we only need to create an auxiliary file for bam, but WIG | ||
# needs an index file as well | ||
AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION = ['bam'] | ||
|
||
uuid = UUIDField(unique=True, auto=True) | ||
study = models.ForeignKey(Study, db_index=True) | ||
|
@@ -598,11 +602,18 @@ def _create_and_associate_auxiliary_node(self, filestore_item): | |
self.add_child(node_object) | ||
return node_object | ||
|
||
def get_children(self): | ||
def get_children(self, auxiliary_filter=None): | ||
""" | ||
Return a list of child Node's uuids for a given Node | ||
""" | ||
return [child.uuid for child in self.children.all()] | ||
if auxiliary_filter is None: | ||
return [child.uuid for child in self.children.all()] | ||
else: | ||
return [ | ||
child.uuid for child in self.children.filter( | ||
is_auxiliary_node=auxiliary_filter | ||
) | ||
] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a lot more readable as a separate function (e.g., |
||
|
||
def get_parents(self): | ||
""" | ||
|
@@ -646,19 +657,28 @@ def run_generate_auxiliary_node_task(self): | |
self.file_item.filetype.used_for_visualization and | ||
self.file_item.datafile and | ||
settings.REFINERY_AUXILIARY_FILE_GENERATION == | ||
'on_file_import'): | ||
'on_file_import' and | ||
self.file_item.get_extension().lower() in | ||
self.AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This list of conditions is a good candidate for factoring out into a helper function (at least a part of it). Also, ideally, this should be checked before calling this function, so you would not have to return None at the end. |
||
# Create an empty FileStoreItem (we do the datafile association | ||
# within the generate_auxiliary_file task | ||
auxiliary_file_store_item = FileStoreItem.objects.create() | ||
|
||
auxiliary_node = self._create_and_associate_auxiliary_node( | ||
auxiliary_file_store_item | ||
) | ||
result = data_set_manager.tasks.generate_auxiliary_file.delay( | ||
auxiliary_node, self.file_item | ||
generate = data_set_manager.tasks.generate_auxiliary_file.subtask( | ||
(auxiliary_node, self.file_item,) | ||
) | ||
auxiliary_file_store_item.import_task_id = result.task_id | ||
auxiliary_file_store_item.save() | ||
file_import = FileImportTask().subtask( | ||
(auxiliary_node.file_item.uuid, None,), | ||
immutable=True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is "this" and why is it unnecessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 675 (if a comment doesn't specify a line range then it is referring to the line directly above it) but now that you mention it, the args on line 674 would not be needed also since "the first task executes passing its return value to the next task in the chain" (http://docs.celeryproject.org/en/3.1/userguide/canvas.html#the-primitives). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah i see - generate_auxiliary_file does not currently do that. were you saying you wanted that? it's not like the FileStoreItem is generated there There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the name implies that it should generate the FileStoreItem and return its UUID but there may be more refactoring required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok i have done some refactoring |
||
) | ||
generate_and_import = chain(generate, file_import) | ||
return generate_and_import | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
else: | ||
logger.debug("No auxiliary Node needs be generated") | ||
return None | ||
|
||
def get_auxiliary_file_generation_task_state(self): | ||
"""Return the generate_auxiliary_file task state for a given auxiliary | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,21 @@ | ||
from datetime import date | ||
import logging | ||
import os | ||
import time | ||
|
||
from django.conf import settings | ||
from django.contrib.auth.models import User | ||
from django.db import transaction | ||
|
||
import botocore | ||
import celery | ||
from celery.task import task | ||
import pysam | ||
import tempfile | ||
|
||
from core.models import DataSet, ExtendedGroup, FileStoreItem | ||
from file_store.models import FileExtension, generate_file_source_translator | ||
from file_store.tasks import FileImportTask | ||
from file_store.tasks import FileImportTask, download_s3_object | ||
|
||
from .isa_tab_parser import IsaTabParser | ||
from .models import Investigation, Node, initialize_attribute_order | ||
|
@@ -273,7 +276,7 @@ def parse_isatab(username, public, path, identity_id=None, | |
return data_set_uuid | ||
|
||
|
||
@task() | ||
@task(soft_time_limit=3600) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is default 60 seconds not sufficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a file-based operation (we have to both move a file and do an operation on it), the timeout should match the FileImportTask's timeout There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The FileImportTask timeout was chosen to accommodate downloads from sites on public Internet which can take a really long time (e.g., from ftp://ftp.sra.ebi.ac.uk). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok i'll do that now |
||
def generate_auxiliary_file(auxiliary_node, parent_node_file_store_item): | ||
"""Task that will generate an auxiliary file for visualization purposes | ||
with specific file generation tasks going on for different FileTypes | ||
|
@@ -287,8 +290,13 @@ def generate_auxiliary_file(auxiliary_node, parent_node_file_store_item): | |
:type parent_node_file_store_item: FileStoreItem | ||
""" | ||
generate_auxiliary_file.update_state(state=celery.states.STARTED) | ||
datafile = parent_node_file_store_item.datafile | ||
|
||
try: | ||
datafile_path = parent_node_file_store_item.datafile.path | ||
if not settings.REFINERY_S3_USER_DATA: | ||
datafile_path = datafile.path | ||
else: | ||
datafile_path = datafile.name | ||
except (NotImplementedError, ValueError): | ||
datafile_path = None | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This try block is huge. I think the only function that can raise exceptions is |
||
|
@@ -305,6 +313,7 @@ def generate_auxiliary_file(auxiliary_node, parent_node_file_store_item): | |
|
||
logger.debug("Auxiliary file for %s generated in %s " | ||
"seconds." % (datafile_path, time.time() - start_time)) | ||
|
||
except Exception as e: | ||
logger.error( | ||
"Something went wrong while trying to generate the auxiliary file " | ||
|
@@ -332,13 +341,25 @@ def generate_bam_index(auxiliary_file_store_item_uuid, datafile_path): | |
# fail if we can't get what we want. | ||
bam_index_file_extension = FileExtension.objects.get(name="bai").name | ||
auxiliary_file_store_item = FileStoreItem.objects.get( | ||
uuid=auxiliary_file_store_item_uuid) | ||
uuid=auxiliary_file_store_item_uuid | ||
) | ||
|
||
# Leverage pysam library to generate bam index file | ||
# FIXME: This should be refactored once we don't have a need for | ||
# Standalone IGV because this is creating a bam_index file in the same | ||
# directory as it's bam file | ||
pysam.index(bytes(datafile_path)) | ||
if settings.REFINERY_S3_USER_DATA: | ||
key = datafile_path | ||
bucket = settings.MEDIA_BUCKET | ||
temp_file = os.path.join(tempfile.gettempdir(), key) | ||
os.makedirs(os.path.abspath(os.path.join(temp_file, os.pardir))) | ||
with open(temp_file, 'wb') as destination: | ||
download_s3_object(bucket, key, destination) | ||
pysam.index(bytes(temp_file)) | ||
datafile_path = temp_file | ||
os.remove(temp_file) | ||
else: | ||
pysam.index(bytes(datafile_path)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The index file should not be written directly to the file store dir - it is part of the reason this code is broken (see also line 348). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe i am confused by what you mean by "file store dir" - this https://pysam.readthedocs.io/en/latest/usage.html#using-samtools-commands-within-python There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i realized this morning coming in you weren't referring to the s3 bit but the local bit - will update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I could not find any documentation about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no - i should have been clearer about that, sorry! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, can we also update pysam to the latest version 0.15.3? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't see why not - will add it in |
||
|
||
# Map source field of FileStoreItem to path of newly created bam index file | ||
auxiliary_file_store_item.source = "{}.{}".format( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just keep it on one line.