Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix .bam file handling for visualizations #3484

Merged
merged 49 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0a7ed9c
rename files in flight in galaxy
ilan-gold Oct 18, 2019
3c4068e
remove unnessary rename_results call
ilan-gold Oct 18, 2019
43db45e
remove rename_results function entirely
ilan-gold Oct 18, 2019
6e8fc3b
remove unused import
ilan-gold Oct 18, 2019
28f2c52
fix json-unicode bug with ast.literal_eval
ilan-gold Oct 23, 2019
bff4862
append auxiliary file url information to
ilan-gold Oct 23, 2019
7a76172
import auxiliary into refinery file system
ilan-gold Oct 23, 2019
08ae2c6
Merge branch 'ilan-gold/rename_on_download' of https://github.com/ref…
ilan-gold Oct 23, 2019
bedc1f7
remove file import subtask
ilan-gold Oct 23, 2019
312bc8d
bump soft_time_limit and use FileImportTask within auxiliary file
ilan-gold Oct 23, 2019
529c65a
add in auxiliary file generation/import task monitoring
ilan-gold Oct 23, 2019
4999476
update tests to reflect new solr field
ilan-gold Oct 23, 2019
f680b4c
Merge branches 'ilan-gold/bam_file_fix' and 'ilan-gold/text-json-bug'…
ilan-gold Oct 23, 2019
82dca41
add data mgiration to add auxiliary file generation/import task
ilan-gold Oct 23, 2019
48e0137
no need for getting the task_id on a Signature object
ilan-gold Oct 23, 2019
38b6afe
remove task_id for auxiliary_file_store_item
ilan-gold Oct 23, 2019
2819048
check if subtask is None and only then add to task list
ilan-gold Oct 24, 2019
482cca7
srv
ilan-gold Oct 24, 2019
4d18e92
get source of FileStoreItem, not the datafile itself
ilan-gold Oct 24, 2019
fe17dbc
access bucket/key in s3 properly and use BASSE_DIR
ilan-gold Oct 24, 2019
8dbfa6f
make directory before creating file
ilan-gold Oct 24, 2019
f92a055
need to create directories recursively
ilan-gold Oct 24, 2019
5e0f73e
execute file generation subtasks as a chain
ilan-gold Oct 24, 2019
26cbda8
remove data_set_manager tasks import
ilan-gold Oct 24, 2019
f3b192e
make file import task immutable
ilan-gold Oct 24, 2019
515575f
remove debugging changes
ilan-gold Oct 24, 2019
f374ffc
add in botocore iport
ilan-gold Oct 25, 2019
7ca8a6d
remove extra return
ilan-gold Oct 25, 2019
acff06d
update logging
ilan-gold Oct 25, 2019
06729db
move temp storage setting to base settings
ilan-gold Oct 28, 2019
f85df5e
use tempfile instead of Django setting
ilan-gold Oct 28, 2019
21fb8fd
Merge branches 'develop' and 'ilan-gold/bam_file_fix' of https://gith…
ilan-gold Oct 30, 2019
b1c9706
add stricter checking for auxiliary file generation
ilan-gold Oct 31, 2019
12df5f5
update spelling
ilan-gold Oct 31, 2019
ba0dfe1
address ilya's comments
ilan-gold Nov 4, 2019
77ebce6
address ilya's task comments
ilan-gold Nov 4, 2019
4c31562
address local file usage in bam
ilan-gold Nov 5, 2019
4ae5da8
upgrade pysam
ilan-gold Nov 5, 2019
c77d4f4
use and and not two if's
ilan-gold Nov 5, 2019
32e6ca6
remove unnecessary commment
ilan-gold Nov 5, 2019
6af7517
update indentation
ilan-gold Nov 5, 2019
084449f
update try-except to be smaller
ilan-gold Nov 5, 2019
023de8d
update root_volume size and celery handling
ilan-gold Nov 5, 2019
f38b064
Merge branch 'ilan-gold/bam_file_fix' of https://github.com/refinery-…
ilan-gold Nov 5, 2019
b9b4b02
Rewrite generate_bam_index() and refactor generate_auxiliary_file()
hackdna Nov 5, 2019
a762c93
actually run post-file import task and change function signature
ilan-gold Nov 6, 2019
60c1ee8
pass bytes to psyam.index
ilan-gold Nov 6, 2019
41b0f6d
update comments and logging
ilan-gold Nov 6, 2019
74a3821
address ilya's comments
ilan-gold Nov 6, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('analysis_manager', '0008_analysisstatus_galaxy_workflow_task_group_id'),
]

operations = [
migrations.AddField(
model_name='analysisstatus',
name='auxiliary_file_task_group_id',
field=models.UUIDField(null=True, editable=False),
),
]
1 change: 1 addition & 0 deletions refinery/analysis_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class AnalysisStatus(models.Model):
galaxy_import_task_group_id = models.UUIDField(null=True, editable=False)
galaxy_export_task_group_id = models.UUIDField(null=True, editable=False)
galaxy_workflow_task_group_id = models.UUIDField(null=True, editable=False)
auxiliary_file_task_group_id = models.UUIDField(null=True, editable=False)
#: state of Galaxy file imports
galaxy_import_state = CharField(max_length=10, blank=True,
choices=GALAXY_HISTORY_STATES)
Expand Down
48 changes: 46 additions & 2 deletions refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,58 @@ def _attach_workflow_outputs(analysis_uuid):
"""
analysis = _get_analysis(analysis_uuid)
analysis_status = _get_analysis_status(analysis_uuid)

if analysis.workflow.type == Workflow.ANALYSIS_TYPE:
analysis.attach_derived_nodes_to_dataset()
if not analysis_status.auxiliary_file_task_group_id:
tasks = analysis.attach_derived_nodes_to_dataset()
logger.info(
"Starting auxiliary file creation for analysis %s'", analysis
)
auxiliary_file_tasks = TaskSet(tasks=tasks).apply_async()
auxiliary_file_tasks.save()
analysis_status.auxiliary_file_task_group_id = \
auxiliary_file_tasks.taskset_id
analysis_status.save()
run_analysis.retry(countdown=RETRY_INTERVAL)
# check if analysis results have finished downloading from Galaxy
auxiliary_file_tasks = get_taskset_result(
analysis_status.auxiliary_file_task_group_id
)
if not auxiliary_file_tasks.ready():
logger.debug("Auxiliary file import and generation "
"running for analysis '%s'", analysis)
run_analysis.retry(countdown=RETRY_INTERVAL)
elif not auxiliary_file_tasks.successful():
error_msg = ("Analysis '{}' failed while generating "
"auxiliary file".format(analysis))
logger.error(error_msg)
analysis.set_status(Analysis.FAILURE_STATUS, error_msg)
analysis.send_email()

get_taskset_result(
analysis_status.refinery_import_task_group_id
).delete()
get_taskset_result(
analysis_status.galaxy_import_task_group_id
).delete()
get_taskset_result(
analysis_status.galaxy_export_task_group_id
).delete()
auxiliary_file_tasks.delete()
analysis.galaxy_cleanup()
elif analysis.workflow.type == Workflow.DOWNLOAD_TYPE:
analysis.attach_outputs_downloads()
else:
logger.warning("Unknown workflow type '%s' in analysis '%s'",
analysis.workflow.type, analysis.name)


def _finalize_analysis(analysis_uuid):
"""
finalize analysis after attaching outputs from galaxy to the refinery file
system
"""
analysis = _get_analysis(analysis_uuid)
analysis_status = _get_analysis_status(analysis_uuid)
analysis.set_status(Analysis.SUCCESS_STATUS)
analysis.send_email()
logger.info("Analysis '%s' finished successfully", analysis)
Expand Down Expand Up @@ -302,6 +345,7 @@ def run_analysis(analysis_uuid):
_check_galaxy_history_state(analysis_uuid)
_galaxy_file_export(analysis_uuid)
_attach_workflow_outputs(analysis_uuid)
_finalize_analysis(analysis_uuid)


def _run_galaxy_file_import(analysis_uuid):
Expand Down
12 changes: 8 additions & 4 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ def attach_derived_nodes_to_dataset(self):
)
)
self._create_derived_data_file_nodes(graph_with_input_nodes_linked)
self._create_annotated_nodes()
return self._create_annotated_nodes()

def attach_outputs_downloads(self):
if self.results.all().count() == 0:
Expand Down Expand Up @@ -1409,6 +1409,7 @@ def _prepare_annotated_nodes(self, node_uuids):
Call order is ensured through:
core.tests.test__prepare_annotated_nodes_calls_methods_in_proper_order
"""
auxiliary_file_tasks = []
for result in self.results.all():
try:
item = FileStoreItem.objects.get(uuid=result.file_store_uuid)
Expand All @@ -1423,9 +1424,12 @@ def _prepare_annotated_nodes(self, node_uuids):
logger.error("Error retrieving Node with file UUID '%s': %s",
item.uuid, exc)
else:
if node.is_derived():
node.run_generate_auxiliary_node_task()
if node.is_derived() and node.is_auxiliary_node_needed():
auxiliary_file_tasks += [
node.run_generate_auxiliary_node_task()
]
index_annotated_nodes_selection(node_uuids)
return auxiliary_file_tasks

def _get_output_connection_to_analysis_result_mapping(self):
"""Create and return a dict mapping each "output" type
Expand Down Expand Up @@ -1627,7 +1631,7 @@ def _create_annotated_nodes(self):
self.get_input_node_study().uuid,
self.get_input_node_assay().uuid
)
self._prepare_annotated_nodes(node_uuids)
return self._prepare_annotated_nodes(node_uuids)

def get_refinery_import_task_signatures(self):
"""Create and return a list of file import task signatures for the
Expand Down
43 changes: 23 additions & 20 deletions refinery/data_set_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.dispatch import receiver

from celery.result import AsyncResult
from celery import chain
from django_extensions.db.fields import UUIDField
import requests
from requests.exceptions import HTTPError
Expand All @@ -22,7 +23,7 @@
from core.utils import delete_analysis_index, skip_if_test_run
import data_set_manager
from file_store.models import FileStoreItem

from file_store.tasks import FileImportTask
"""
TODO: Refactor import data_set_manager. Importing
data_set_manager.tasks.generate_auxiliary_file()
Expand Down Expand Up @@ -497,6 +498,9 @@ class Node(models.Model):
TYPES = ASSAYS | FILES | {
SOURCE, SAMPLE, EXTRACT, LABELED_EXTRACT, SCAN, NORMALIZATION,
DATA_TRANSFORMATION}
# Currently we only need to create an auxiliary file for bam, but WIG
# needs an index file as well
AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION = ['bam']

uuid = UUIDField(unique=True, auto=True)
study = models.ForeignKey(Study, db_index=True)
Expand Down Expand Up @@ -573,7 +577,7 @@ def is_orphan(self):
def get_analysis_node_connections(self):
return core.models.AnalysisNodeConnection.objects.filter(node=self)

def _create_and_associate_auxiliary_node(self, filestore_item):
def create_and_associate_auxiliary_node(self, filestore_item):
"""
Tries to create and associate an auxiliary Node with a parent
node.
Expand Down Expand Up @@ -626,6 +630,15 @@ def get_auxiliary_nodes(self):

return aux_nodes

def is_auxiliary_node_needed(self):
return self.file_item and self.file_item.filetype and \
self.file_item.filetype.used_for_visualization and \
self.file_item.datafile and \
settings.REFINERY_AUXILIARY_FILE_GENERATION == \
'on_file_import' and \
self.file_item.get_extension().lower() in \
self.AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION

def run_generate_auxiliary_node_task(self):
"""This method is initiated after a task_success signal is returned
from the file import task.
Expand All @@ -641,24 +654,14 @@ def run_generate_auxiliary_node_task(self):
# Check if the Django setting to generate auxiliary file has been
# set to run when FileStoreItems are imported into Refinery
logger.debug("Checking if some auxiliary Node should be generated")
# Check if we pass the logic to generate aux. Files/Nodes
if (self.file_item and self.file_item.filetype and
self.file_item.filetype.used_for_visualization and
self.file_item.datafile and
settings.REFINERY_AUXILIARY_FILE_GENERATION ==
'on_file_import'):
# Create an empty FileStoreItem (we do the datafile association
# within the generate_auxiliary_file task
auxiliary_file_store_item = FileStoreItem.objects.create()

auxiliary_node = self._create_and_associate_auxiliary_node(
auxiliary_file_store_item
)
result = data_set_manager.tasks.generate_auxiliary_file.delay(
auxiliary_node, self.file_item
)
auxiliary_file_store_item.import_task_id = result.task_id
auxiliary_file_store_item.save()
# Create an empty FileStoreItem (we do the datafile association
# within the generate_auxiliary_file task

generate = data_set_manager.tasks.generate_auxiliary_file.subtask(
(self.uuid,)
)
file_import = FileImportTask().subtask()
return chain(generate, file_import)

def get_auxiliary_file_generation_task_state(self):
"""Return the generate_auxiliary_file task state for a given auxiliary
Expand Down
64 changes: 45 additions & 19 deletions refinery/data_set_manager/tasks.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from datetime import date
import logging
import os
import time

from django.conf import settings
from django.contrib.auth.models import User
from django.db import transaction

import botocore
import celery
from celery.task import task
import pysam
import tempfile

from core.models import DataSet, ExtendedGroup, FileStoreItem
from file_store.models import FileExtension, generate_file_source_translator
from file_store.tasks import FileImportTask
from file_store.tasks import FileImportTask, download_s3_object, \
copy_file_object

from .isa_tab_parser import IsaTabParser
from .models import Investigation, Node, initialize_attribute_order
Expand Down Expand Up @@ -273,22 +277,26 @@ def parse_isatab(username, public, path, identity_id=None,
return data_set_uuid


@task()
def generate_auxiliary_file(auxiliary_node, parent_node_file_store_item):
@task(soft_time_limit=3600)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is default 60 seconds not sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a file-based operation (we have to both move a file and do an operation on it), the timeout should match the FileImportTask's timeout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileImportTask timeout was chosen to accommodate downloads from sites on public Internet which can take a really long time (e.g., from ftp://ftp.sra.ebi.ac.uk).
Here we are dealing with transfers to/from S3 within AWS network. It would be great to benchmark how long does this operation take for a typical BAM file (download from s3 + indexing + upload to S3) and set the timeout accordingly (perhaps with a 30% margin?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i'll do that now

def generate_auxiliary_file(parent_node_uuid):
"""Task that will generate an auxiliary file for visualization purposes
with specific file generation tasks going on for different FileTypes
flagged as: `used_for_visualization`.
:param auxiliary_node: a Node instance
:type auxiliary_node: Node
:param datafile_path: relative path to datafile used to generate aux file
:type datafile_path: String
:param parent_node_file_store_item: FileStoreItem associated with the
parent Node
:type parent_node_file_store_item: FileStoreItem
:param parent_node: the parent Node uuid
:type parent_node_file_store_item: Node
"""
generate_auxiliary_file.update_state(state=celery.states.STARTED)
parent_node = Node.objects.get(uuid=parent_node_uuid)
datafile = parent_node.file_item.datafile
auxiliary_file_store_item = FileStoreItem.objects.create()
auxiliary_node = parent_node.create_and_associate_auxiliary_node(
auxiliary_file_store_item
)
try:
datafile_path = parent_node_file_store_item.datafile.path
if not settings.REFINERY_S3_USER_DATA:
datafile_path = datafile.path
else:
datafile_path = datafile.name
except (NotImplementedError, ValueError):
datafile_path = None
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try block is huge. I think the only function that can raise exceptions is generate_bam_index().

Expand All @@ -298,13 +306,15 @@ def generate_auxiliary_file(auxiliary_node, parent_node_file_store_item):
# Here we are checking for the FileExtension of the ParentNode's
# FileStoreItem because we will create auxiliary files based on what
# said value is
if parent_node_file_store_item.get_extension().lower() == 'bam':
if parent_node.file_item.get_extension().lower() == 'bam':
hackdna marked this conversation as resolved.
Show resolved Hide resolved
generate_bam_index(auxiliary_node.file_item.uuid, datafile_path)

generate_auxiliary_file.update_state(state=celery.states.SUCCESS)

logger.debug("Auxiliary file for %s generated in %s "
"seconds." % (datafile_path, time.time() - start_time))
return auxiliary_file_store_item.uuid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the task fail here also?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


except Exception as e:
logger.error(
"Something went wrong while trying to generate the auxiliary file "
Expand Down Expand Up @@ -332,13 +342,28 @@ def generate_bam_index(auxiliary_file_store_item_uuid, datafile_path):
# fail if we can't get what we want.
bam_index_file_extension = FileExtension.objects.get(name="bai").name
auxiliary_file_store_item = FileStoreItem.objects.get(
uuid=auxiliary_file_store_item_uuid)
uuid=auxiliary_file_store_item_uuid
)

# Leverage pysam library to generate bam index file
# FIXME: This should be refactored once we don't have a need for
# Standalone IGV because this is creating a bam_index file in the same
# directory as it's bam file
pysam.index(bytes(datafile_path))
if settings.REFINERY_S3_USER_DATA:
key = datafile_path
bucket = settings.MEDIA_BUCKET
temp_file = os.path.join(tempfile.gettempdir(), key)
os.makedirs(os.path.abspath(os.path.join(temp_file, os.pardir)))
with open(temp_file, 'wb') as destination:
download_s3_object(bucket, key, destination)
pysam.index(bytes(temp_file))
datafile_path = temp_file
os.remove(temp_file)
else:
temp_file = os.path.join(tempfile.gettempdir(), datafile_path)
os.makedirs(os.path.abspath(os.path.join(temp_file, os.pardir)))
with open(temp_file, 'wb') as destination, \
open(datafile_path, 'rb') as source:
copy_file_object(source, destination)
pysam.index(bytes(temp_file))
datafile_path = temp_file
os.remove(temp_file)

# Map source field of FileStoreItem to path of newly created bam index file
auxiliary_file_store_item.source = "{}.{}".format(
Expand Down Expand Up @@ -367,7 +392,8 @@ def post_process_file_import(**kwargs):
node.update_solr_index()
logger.info("Updated Solr index with file import state for Node '%s'",
node.uuid)
if kwargs['state'] == celery.states.SUCCESS:
if kwargs['state'] == celery.states.SUCCESS and \
node.is_auxiliary_node_needed():
node.run_generate_auxiliary_node_task()


Expand Down
6 changes: 3 additions & 3 deletions refinery/data_set_manager/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_is_orphan(self):

def test_create_and_associate_auxiliary_node(self):
self.assertEqual(self.node.get_children(), [])
self.node._create_and_associate_auxiliary_node(self.filestore_item)
self.node.create_and_associate_auxiliary_node(self.filestore_item)
self.assertIsNotNone(self.node.get_children())
self.assertIsNotNone(Node.objects.get(file_item=self.filestore_item))
self.assertEqual(self.node.get_children()[0],
Expand All @@ -138,15 +138,15 @@ def test_create_and_associate_auxiliary_node(self):
def test_get_auxiliary_nodes(self):
self.assertEqual(self.node.get_children(), [])
for i in xrange(2):
self.node._create_and_associate_auxiliary_node(self.filestore_item)
self.node.create_and_associate_auxiliary_node(self.filestore_item)
# Still just one child even on second time
self.assertEqual(len(self.node.get_children()), 1)

def test_get_auxiliary_file_generation_task_state(self):
# Normal nodes will always return None
self.assertIsNone(self.node.get_auxiliary_file_generation_task_state())
# Auxiliary nodes will have a task state
self.node._create_and_associate_auxiliary_node(self.filestore_item)
self.node.create_and_associate_auxiliary_node(self.filestore_item)
auxiliary = Node.objects.get(uuid=self.node.get_children()[0])
state = auxiliary.get_auxiliary_file_generation_task_state()
# Values from:
Expand Down
9 changes: 8 additions & 1 deletion refinery/tool_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ class VisualizationTool(Tool):
"""
API_PREFIX = "api_prefix"
FILE_URL = "file_url"
AUXILIARY_FILE_LIST = "auxiliary_file_list"
INPUT_NODE_INFORMATION = "node_info"
NODE_SOLR_INFO = "node_solr_info"
ALL_NODE_INFORMATION = "all_node_info"
Expand Down Expand Up @@ -553,7 +554,13 @@ def _get_detailed_nodes_dict(self, node_uuid_list,
self.FILE_URL: get_file_url_from_node_uuid(
node["uuid"],
require_valid_url=require_valid_urls
)
),
self.AUXILIARY_FILE_LIST: [
get_file_url_from_node_uuid(
child.uuid, require_valid_url=require_valid_urls
) for child in Node.objects.get(uuid=node["uuid"]).
get_auxiliary_nodes()
]
}
for node in solr_response_json["nodes"]
}
Expand Down