Skip to content

Commit

Permalink
Fix .bam file handling for visualizations (#3484)
Browse files Browse the repository at this point in the history
Fix `.bam` file creation and handling by
1) Being explicit about what needs an auxiliary file (i.e `.bam`)
2) Creating a clear task structure for generation and import of the auxiliary files
3) Provide an endpoint to the index `.bai` file to visualizations that use `.bam`
  • Loading branch information
ilan-gold committed Nov 12, 2019
1 parent da4535f commit 2a7b804
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 110 deletions.
3 changes: 2 additions & 1 deletion deployment/terraform/modules/ec2/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ resource "aws_instance" "app_server" {
subnet_id = "${var.subnet_id}"
iam_instance_profile = "${aws_iam_instance_profile.app_server.name}"
root_block_device {
volume_type = "gp2"
volume_type = "gp2",
volume_size = 12
}
ebs_block_device {
delete_on_termination = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('analysis_manager', '0008_analysisstatus_galaxy_workflow_task_group_id'),
]

operations = [
migrations.AddField(
model_name='analysisstatus',
name='auxiliary_file_task_group_id',
field=models.UUIDField(null=True, editable=False),
),
]
1 change: 1 addition & 0 deletions refinery/analysis_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class AnalysisStatus(models.Model):
galaxy_import_task_group_id = models.UUIDField(null=True, editable=False)
galaxy_export_task_group_id = models.UUIDField(null=True, editable=False)
galaxy_workflow_task_group_id = models.UUIDField(null=True, editable=False)
auxiliary_file_task_group_id = models.UUIDField(null=True, editable=False)
#: state of Galaxy file imports
galaxy_import_state = CharField(max_length=10, blank=True,
choices=GALAXY_HISTORY_STATES)
Expand Down
48 changes: 46 additions & 2 deletions refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,58 @@ def _attach_workflow_outputs(analysis_uuid):
"""
analysis = _get_analysis(analysis_uuid)
analysis_status = _get_analysis_status(analysis_uuid)

if analysis.workflow.type == Workflow.ANALYSIS_TYPE:
analysis.attach_derived_nodes_to_dataset()
if not analysis_status.auxiliary_file_task_group_id:
tasks = analysis.attach_derived_nodes_to_dataset()
logger.info(
"Starting auxiliary file creation for analysis %s'", analysis
)
auxiliary_file_tasks = TaskSet(tasks=tasks).apply_async()
auxiliary_file_tasks.save()
analysis_status.auxiliary_file_task_group_id = \
auxiliary_file_tasks.taskset_id
analysis_status.save()
run_analysis.retry(countdown=RETRY_INTERVAL)
# check if analysis results have finished downloading from Galaxy
auxiliary_file_tasks = get_taskset_result(
analysis_status.auxiliary_file_task_group_id
)
if not auxiliary_file_tasks.ready():
logger.debug("Auxiliary file import and generation "
"running for analysis '%s'", analysis)
run_analysis.retry(countdown=RETRY_INTERVAL)
elif not auxiliary_file_tasks.successful():
error_msg = ("Analysis '{}' failed while generating "
"auxiliary file".format(analysis))
logger.error(error_msg)
analysis.set_status(Analysis.FAILURE_STATUS, error_msg)
analysis.send_email()

get_taskset_result(
analysis_status.refinery_import_task_group_id
).delete()
get_taskset_result(
analysis_status.galaxy_import_task_group_id
).delete()
get_taskset_result(
analysis_status.galaxy_export_task_group_id
).delete()
auxiliary_file_tasks.delete()
analysis.galaxy_cleanup()
elif analysis.workflow.type == Workflow.DOWNLOAD_TYPE:
analysis.attach_outputs_downloads()
else:
logger.warning("Unknown workflow type '%s' in analysis '%s'",
analysis.workflow.type, analysis.name)


def _finalize_analysis(analysis_uuid):
"""
finalize analysis after attaching outputs from galaxy to the refinery file
system
"""
analysis = _get_analysis(analysis_uuid)
analysis_status = _get_analysis_status(analysis_uuid)
analysis.set_status(Analysis.SUCCESS_STATUS)
analysis.send_email()
logger.info("Analysis '%s' finished successfully", analysis)
Expand Down Expand Up @@ -302,6 +345,7 @@ def run_analysis(analysis_uuid):
_check_galaxy_history_state(analysis_uuid)
_galaxy_file_export(analysis_uuid)
_attach_workflow_outputs(analysis_uuid)
_finalize_analysis(analysis_uuid)


def _run_galaxy_file_import(analysis_uuid):
Expand Down
5 changes: 4 additions & 1 deletion refinery/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ def get_setting(name, settings=local_settings, default=None):
'%(funcName)s[%(task_id)s] - %(message)s'
# for system stability
CELERYD_MAX_TASKS_PER_CHILD = get_setting("CELERYD_MAX_TASKS_PER_CHILD")
CELERY_ROUTES = {"file_store.tasks.FileImportTask": {"queue": "file_import"}}
CELERY_ROUTES = {
"file_store.tasks.FileImportTask": {"queue": "file_import"},
"data_set_manager.tasks.generate_auxiliary_file": {"queue": "file_import"}
}
CELERY_ACCEPT_CONTENT = ['pickle']
CELERYD_TASK_SOFT_TIME_LIMIT = 60 # seconds
CELERYBEAT_SCHEDULE = {
Expand Down
12 changes: 8 additions & 4 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ def attach_derived_nodes_to_dataset(self):
)
)
self._create_derived_data_file_nodes(graph_with_input_nodes_linked)
self._create_annotated_nodes()
return self._create_annotated_nodes()

def attach_outputs_downloads(self):
if self.results.all().count() == 0:
Expand Down Expand Up @@ -1409,6 +1409,7 @@ def _prepare_annotated_nodes(self, node_uuids):
Call order is ensured through:
core.tests.test__prepare_annotated_nodes_calls_methods_in_proper_order
"""
auxiliary_file_tasks = []
for result in self.results.all():
try:
item = FileStoreItem.objects.get(uuid=result.file_store_uuid)
Expand All @@ -1423,9 +1424,12 @@ def _prepare_annotated_nodes(self, node_uuids):
logger.error("Error retrieving Node with file UUID '%s': %s",
item.uuid, exc)
else:
if node.is_derived():
node.run_generate_auxiliary_node_task()
if node.is_derived() and node.is_auxiliary_node_needed():
auxiliary_file_tasks += [
node.generate_auxiliary_node_task()
]
index_annotated_nodes_selection(node_uuids)
return auxiliary_file_tasks

def _get_output_connection_to_analysis_result_mapping(self):
"""Create and return a dict mapping each "output" type
Expand Down Expand Up @@ -1627,7 +1631,7 @@ def _create_annotated_nodes(self):
self.get_input_node_study().uuid,
self.get_input_node_assay().uuid
)
self._prepare_annotated_nodes(node_uuids)
return self._prepare_annotated_nodes(node_uuids)

def get_refinery_import_task_signatures(self):
"""Create and return a list of file import task signatures for the
Expand Down
59 changes: 26 additions & 33 deletions refinery/data_set_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.dispatch import receiver

from celery.result import AsyncResult
from celery import chain
from django_extensions.db.fields import UUIDField
import requests
from requests.exceptions import HTTPError
Expand All @@ -22,7 +23,7 @@
from core.utils import delete_analysis_index, skip_if_test_run
import data_set_manager
from file_store.models import FileStoreItem

from file_store.tasks import FileImportTask
"""
TODO: Refactor import data_set_manager. Importing
data_set_manager.tasks.generate_auxiliary_file()
Expand Down Expand Up @@ -497,6 +498,9 @@ class Node(models.Model):
TYPES = ASSAYS | FILES | {
SOURCE, SAMPLE, EXTRACT, LABELED_EXTRACT, SCAN, NORMALIZATION,
DATA_TRANSFORMATION}
# Currently we only need to create an auxiliary file for bam, but WIG
# needs an index file as well
AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION = ['bam']

uuid = UUIDField(unique=True, auto=True)
study = models.ForeignKey(Study, db_index=True)
Expand Down Expand Up @@ -573,7 +577,7 @@ def is_orphan(self):
def get_analysis_node_connections(self):
return core.models.AnalysisNodeConnection.objects.filter(node=self)

def _create_and_associate_auxiliary_node(self, filestore_item):
def create_and_associate_auxiliary_node(self, filestore_item):
"""
Tries to create and associate an auxiliary Node with a parent
node.
Expand Down Expand Up @@ -626,39 +630,28 @@ def get_auxiliary_nodes(self):

return aux_nodes

def run_generate_auxiliary_node_task(self):
def is_auxiliary_node_needed(self):
return self.file_item and self.file_item.filetype and \
self.file_item.filetype.used_for_visualization and \
self.file_item.datafile and \
settings.REFINERY_AUXILIARY_FILE_GENERATION == \
'on_file_import' and \
self.file_item.get_extension().lower() in \
self.AUXILIARY_FILES_NEEDED_FOR_VISUALIZATION

def generate_auxiliary_node_task(self):
"""This method is initiated after a task_success signal is returned
from the file import task.
Here we check if the imported FileStoreItem returned from the
file import task is in need of the creation of some auxiliary
File/Node. If this is the case, we create auxiliary Node and
FileStoreItem objects, and then proceed to run the
generate_auxiliary_file task, and associate said task's id with the
newly created FileStoreItems `import_task_id` field so that we can
monitor the task state.
from the file import task. It generates the tasks for creating an
auxiliary file and importing it into refinery. Use
is_auxiliary_node_needed() to check if this should be run before
running it.
"""
# Check if the Django setting to generate auxiliary file has been
# set to run when FileStoreItems are imported into Refinery
logger.debug("Checking if some auxiliary Node should be generated")
# Check if we pass the logic to generate aux. Files/Nodes
if (self.file_item and self.file_item.filetype and
self.file_item.filetype.used_for_visualization and
self.file_item.datafile and
settings.REFINERY_AUXILIARY_FILE_GENERATION ==
'on_file_import'):
# Create an empty FileStoreItem (we do the datafile association
# within the generate_auxiliary_file task
auxiliary_file_store_item = FileStoreItem.objects.create()

auxiliary_node = self._create_and_associate_auxiliary_node(
auxiliary_file_store_item
)
result = data_set_manager.tasks.generate_auxiliary_file.delay(
auxiliary_node, self.file_item
)
auxiliary_file_store_item.import_task_id = result.task_id
auxiliary_file_store_item.save()

generate = data_set_manager.tasks.generate_auxiliary_file.subtask(
(self.uuid,)
)
file_import = FileImportTask().subtask()
return chain(generate, file_import)

def get_auxiliary_file_generation_task_state(self):
"""Return the generate_auxiliary_file task state for a given auxiliary
Expand Down

0 comments on commit 2a7b804

Please sign in to comment.