Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File import bug fixes #2723

Merged
merged 21 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a00d8ca
Terminate file import tasks on FileStoreItem deletion and code cleanup
hackdna Mar 20, 2018
697c994
Merge branch 'develop' into hackdna/terminate-file-import
hackdna Mar 22, 2018
95a6330
Attempt to cancel file import on every file deletion, code cleanup
hackdna Mar 22, 2018
ced0c65
Stop import_file task only when task ID is available and update unit …
hackdna Mar 23, 2018
7839c05
Add unit tests for file import task termination on file delete
hackdna Mar 23, 2018
cd5083f
Avoid canceling file import when not replacing the datafile, add unit…
hackdna Mar 26, 2018
b400d1b
Add unit tests for data file symlinking and test cleanup
hackdna Mar 27, 2018
cd104bd
PEP8 fix and test code cleanup
hackdna Mar 27, 2018
b59fb9e
Test code cleanup
hackdna Mar 28, 2018
34088dd
Remove datafile and import_file task updates from FileStoreItem.save(…
hackdna Mar 30, 2018
edc1055
Cleanup update_solr_index()
hackdna Apr 2, 2018
325b08f
Merge branch 'develop' into hackdna/terminate-file-import
hackdna Apr 6, 2018
8d57457
Improve logging in delete_datafile() and remove commented out code fr…
hackdna Apr 9, 2018
d9e83a1
Move files from file store temp into the file store dir during import
hackdna Apr 9, 2018
43fe610
Make _mkdir() more robust and update import_file to avoid removing em…
hackdna Apr 9, 2018
de2a0f5
Improve logging in Celery tasks
hackdna Apr 10, 2018
b67101b
Improve logging in CheckDataFilesView
hackdna Apr 17, 2018
8b9cf88
Refactor import_file task to fail in case of errors and make sure fil…
hackdna Apr 17, 2018
38b2976
Merge branch 'develop' into hackdna/terminate-file-import
hackdna Apr 18, 2018
2c05fd5
Improve directory making logic
hackdna Apr 18, 2018
4b985f4
Code cleanup
hackdna Apr 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions refinery/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ def get_setting(name, settings=local_settings, default=None):

EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"

# for external functions called in Celery tasks
CELERYD_LOG_FORMAT = '%(asctime)s %(levelname)-8s %(name)s:%(lineno)s ' \
'%(funcName)s() - %(message)s'
CELERYD_TASK_LOG_FORMAT = '%(asctime)s %(levelname)-8s %(name)s:%(lineno)s ' \
'%(funcName)s[%(task_id)s] - %(message)s'
# for system stability
CELERYD_MAX_TASKS_PER_CHILD = get_setting("CELERYD_MAX_TASKS_PER_CHILD")
CELERY_ROUTES = {"file_store.tasks.import_file": {"queue": "file_import"}}
Expand Down
9 changes: 1 addition & 8 deletions refinery/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,19 +799,12 @@ def _dataset_delete(sender, instance, *args, **kwargs):
See: https://docs.djangoproject.com/en/1.8/topics/db/models/
#overriding-model-methods
"""

# terminate any running file import tasks
for file_store_item in instance.get_file_store_items():
file_store_item.terminate_file_import_task()

related_investigation_links = instance.get_investigation_links()

with transaction.atomic():
# delete FileStoreItem and datafile corresponding to the
# metadata file used to generate the DataSet
instance.get_metadata_as_file_store_item().delete()

for investigation_link in related_investigation_links:
for investigation_link in instance.get_investigation_links():
investigation_link.get_node_collection().delete()

delete_data_set_index(instance)
Expand Down
3 changes: 2 additions & 1 deletion refinery/core/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ def test_analysis_deletion_removes_related_objects(self):
def test_analysis_bulk_deletion_removes_related_objects(self):
# make a second Analysis
make_analyses_with_single_dataset(1, self.user)
Analysis.objects.all().delete()
with mock.patch('celery.result.AsyncResult'):
Analysis.objects.all().delete()

self.assertEqual(Analysis.objects.count(), 0)
self.assertEqual(AnalysisNodeConnection.objects.count(), 0)
Expand Down
63 changes: 15 additions & 48 deletions refinery/data_set_manager/search_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

from django.conf import settings

from celery.states import PENDING, SUCCESS
from constants import NOT_AVAILABLE
from djcelery.models import TaskMeta
import celery
from haystack import indexes
from haystack.exceptions import SkipDocument

import constants
import core
from file_store.models import FileStoreItem

Expand Down Expand Up @@ -118,46 +117,17 @@ def prepare(self, node):
FileStoreItem.MultipleObjectsReturned) as e:
logger.error("Couldn't properly fetch FileStoreItem: %s", e)
file_store_item = None
download_url = NOT_AVAILABLE
download_url_or_state = constants.NOT_AVAILABLE
data['filetype_Characteristics' + NodeIndex.GENERIC_SUFFIX] = ''
else:
data['filetype_Characteristics' + NodeIndex.GENERIC_SUFFIX] = \
file_store_item.filetype
download_url = file_store_item.get_datafile_url()
if download_url is None:
if not file_store_item.import_task_id:
logger.debug("No import_task_id yet for FileStoreItem "
"with UUID: %s", file_store_item.uuid)
download_url = PENDING
else:
logger.debug(
"FileStoreItem with UUID: %s has import_task_id: %s",
file_store_item.uuid,
file_store_item.import_task_id
)
if file_store_item.get_import_status() == SUCCESS:
download_url = NOT_AVAILABLE
else:
# The underlying Celery code in
# FileStoreItem.get_import_status() makes an assumption
# that a result is "probably" PENDING even if it can't
# find an associated Task. See:
# https://github.com/celery/celery/blob/v3.1.20/celery/
# backends/amqp.py#L192-L193 So we double check here to
# make sure said assumption holds up
try:
TaskMeta.objects.get(
task_id=file_store_item.import_task_id
)
except TaskMeta.DoesNotExist:
logger.debug(
"No file_import task for FileStoreItem with "
"UUID: %s",
file_store_item.uuid
)
download_url = NOT_AVAILABLE
else:
download_url = PENDING
download_url_or_state = file_store_item.get_datafile_url()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just call this download_url and below use import_state/import_status? It seems that get_datafile_url() won't return a FileStoreItem's import state, and get_import_status() won't return download url information.

Copy link
Member Author

@hackdna hackdna Apr 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this would be great. However, it would require changing the front-end code to deal with file import state and download URL separately.
So, I simplified the logic as much as I could without going too far down the rabbit hole and renamed this variable to reflect its actual current use.

if download_url_or_state is None:
download_url_or_state = file_store_item.get_import_status()
# UI can not handle FAILURE state
if download_url_or_state == celery.states.FAILURE:
download_url_or_state = constants.NOT_AVAILABLE

data.update(self._assay_data(node))

Expand Down Expand Up @@ -191,31 +161,28 @@ def prepare(self, node):
if value != "":
data[key].add(value)
else:
data[key].add(NOT_AVAILABLE)
data[key].add(constants.NOT_AVAILABLE)

# iterate over all keys in data and join sets into strings
for key, value in data.iteritems():
if type(value) is set:
data[key] = " + ".join(sorted(value))

data.update({
NodeIndex.DOWNLOAD_URL:
download_url,
NodeIndex.TYPE_PREFIX + id_suffix:
node.type,
NodeIndex.NAME_PREFIX + id_suffix:
node.name,
NodeIndex.DOWNLOAD_URL: download_url_or_state,
NodeIndex.TYPE_PREFIX + id_suffix: node.type,
NodeIndex.NAME_PREFIX + id_suffix: node.name,
NodeIndex.FILETYPE_PREFIX + id_suffix:
"" if file_store_item is None
else file_store_item.filetype,
NodeIndex.ANALYSIS_UUID_PREFIX + id_suffix:
NOT_AVAILABLE if node.get_analysis() is None
constants.NOT_AVAILABLE if node.get_analysis() is None
else node.get_analysis().name,
NodeIndex.SUBANALYSIS_PREFIX + id_suffix:
(-1 if node.subanalysis is None # TODO: upgrade flake8
else node.subanalysis), # and remove parentheses
NodeIndex.WORKFLOW_OUTPUT_PREFIX + id_suffix:
NOT_AVAILABLE if node.workflow_output is None
constants.NOT_AVAILABLE if node.workflow_output is None
else node.workflow_output
})

Expand Down
29 changes: 15 additions & 14 deletions refinery/data_set_manager/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
from django.http import QueryDict
from django.test import LiveServerTestCase, TestCase

from celery.states import PENDING, STARTED, SUCCESS
from constants import NOT_AVAILABLE
from celery.states import FAILURE, PENDING, STARTED, SUCCESS
from djcelery.models import TaskMeta
from guardian.shortcuts import assign_perm
from haystack.exceptions import SkipDocument
import mock
from mock import ANY
from rest_framework.test import APIClient, APIRequestFactory, APITestCase

import constants
from core.models import (INPUT_CONNECTION, OUTPUT_CONNECTION, Analysis,
AnalysisNodeConnection, DataSet, ExtendedGroup,
InvestigationLink)
Expand Down Expand Up @@ -1913,9 +1913,6 @@ def setUp(self):

self.maxDiff = None

def tearDown(self):
FileStoreItem.objects.all().delete()

def test_skip_types(self):
self.node.type = 'Unknown File Type'
with self.assertRaises(SkipDocument):
Expand Down Expand Up @@ -2007,34 +2004,38 @@ def test_prepare_node_pending_non_existent_file_import_task(self):
self.import_task.delete()
with mock.patch.object(FileStoreItem, 'get_datafile_url',
return_value=None):
self._assert_node_index_prepared_correctly(
self._prepare_node_index(self.node),
expected_download_url=NOT_AVAILABLE
)
with mock.patch.object(FileStoreItem, 'get_import_status',
return_value=FAILURE):
self._assert_node_index_prepared_correctly(
self._prepare_node_index(self.node),
expected_download_url=constants.NOT_AVAILABLE
)

def test_prepare_node_no_file_import_task_id_yet(self):
self.file_store_item.import_task_id = ""
self.file_store_item.save()
self.import_task.delete()
self._assert_node_index_prepared_correctly(
self._prepare_node_index(self.node), expected_download_url=PENDING
self._prepare_node_index(self.node),
expected_download_url=constants.NOT_AVAILABLE
)

def test_prepare_node_no_file_store_item(self):
self.file_store_item.delete()
with mock.patch('celery.result.AsyncResult'):
self.file_store_item.delete()
self._assert_node_index_prepared_correctly(
self._prepare_node_index(self.node),
expected_download_url=NOT_AVAILABLE, expected_filetype=''
expected_download_url=constants.NOT_AVAILABLE, expected_filetype=''
)

def test_prepare_node_s3_file_store_item_source_no_datafile(self):
self.file_store_item.source = 's3://test/test.txt'
self.file_store_item.save()
with mock.patch.object(FileStoreItem, 'get_import_status',
return_value=SUCCESS):
return_value=FAILURE):
self._assert_node_index_prepared_correctly(
self._prepare_node_index(self.node),
expected_download_url=NOT_AVAILABLE,
expected_download_url=constants.NOT_AVAILABLE,
expected_filetype=self.file_store_item.filetype
)

Expand Down
20 changes: 11 additions & 9 deletions refinery/data_set_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,6 @@ def post(self, request, *args, **kwargs):
else:
dataset_uuid = parse_isatab_invocation

try:
os.unlink(response['data']['temp_file_path'])
except OSError as e:
logger.error(
"Couldn't unlink temporary file: %s %s",
response['data']['temp_file_path'], e
)

# import data files
if dataset_uuid:
try:
Expand Down Expand Up @@ -585,23 +577,33 @@ def post(self, request, *args, **kwargs):
# get a list of all uploaded S3 objects for the user
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(settings.UPLOAD_BUCKET)
# TODO: handle ParamValidationError (return error msg in response?)
for s3_object in s3_bucket.objects.filter(Prefix=identity_id):
uploaded_s3_key_list.append(s3_object.key)

for input_file_path in input_file_list:
if not isinstance(input_file_path, unicode):
bad_file_list.append(input_file_path)
logger.error("Uploaded file path '%s' is not a string",
input_file_path)
else:
input_file_path = translate_file_source(input_file_path)
if settings.REFINERY_DEPLOYMENT_PLATFORM == 'aws':
# check if S3 object key exists
bucket_name, key = parse_s3_url(input_file_path)
if key not in uploaded_s3_key_list:
bad_file_list.append(os.path.basename(key))
logger.debug("Object key '%s' does not exist in '%s'",
key, bucket_name)
else:
logger.debug("Object key '%s' exists in '%s'",
key, bucket_name)
else: # POSIX file system
if not os.path.exists(input_file_path):
bad_file_list.append(input_file_path)
logger.debug("Checked file path: '%s'", input_file_path)
logger.debug("File '%s' does not exist")
else:
logger.debug("File '%s' exists")

# prefix output to protect from JSON vulnerability (stripped by
# Angular)
Expand Down