Skip to content

Commit

Permalink
Update rename_datafile() (#3040)
Browse files Browse the repository at this point in the history
* Update rename_datafile() to support S3 storage backend
* Remove _rename_file_on_disk()
* Switch to Celery logger in analysis_manager tasks
* Add a default value for progress_report callable arg
  • Loading branch information
hackdna committed Oct 2, 2018
1 parent beefa5b commit 587fb66
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
6 changes: 4 additions & 2 deletions refinery/analysis_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
@author: nils
'''
import logging
import urlparse

from django.conf import settings

from bioblend import galaxy
import celery
from celery.result import TaskSetResult
Expand All @@ -20,7 +21,8 @@

from .models import AnalysisStatus

logger = logging.getLogger(__name__)
logger = celery.utils.log.get_task_logger(__name__)
logger.setLevel(celery.utils.LOG_LEVELS[settings.REFINERY_LOG_LEVEL])

RETRY_INTERVAL = 5 # seconds

Expand Down
80 changes: 35 additions & 45 deletions refinery/file_store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import re

from django.conf import settings
from django.core.files.storage import default_storage
from django.db import models
from django.db.models.signals import post_delete
from django.dispatch import receiver
Expand All @@ -25,7 +24,8 @@

import constants
import core
from .utils import make_dir
from .utils import (S3MediaStorage, SymlinkedFileSystemStorage, copy_s3_object,
delete_s3_object, make_dir, move_file)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -187,29 +187,44 @@ def delete_datafile(self, save_instance=True):
else:
logger.info("Deleted datafile '%s'", file_name)

def rename_datafile(self, name):
def rename_datafile(self, new_name):
"""Change name of the data file
New name may not be the same as the requested name in case of conflict
with an existing file
Note: new name may not be the same as the requested name in case of
conflict with an existing file or S3 object
"""
logger.debug("Renaming datafile '%s' to '%s'",
self.datafile.name, name)
if self.datafile:
# obtain a new path based on requested name
new_relative_path = default_storage.get_name(name)
new_absolute_path = os.path.join(settings.FILE_STORE_BASE_DIR,
new_relative_path)
if _rename_file_on_disk(self.datafile.path, new_absolute_path):
self.datafile.name = new_relative_path
self.save()
return os.path.basename(self.datafile.name)
logger.debug("Renaming datafile '%s' to '%s'",
self.datafile.name, new_name)
if settings.REFINERY_S3_USER_DATA:
storage = S3MediaStorage()
new_file_store_name = storage.get_name(new_name)
try:
copy_s3_object(storage.bucket_name, self.datafile.name,
storage.bucket_name, new_file_store_name)
except RuntimeError as exc:
logger.error("Renaming datafile '%s' failed: %s",
self.datafile.name, exc)
else:
delete_s3_object(storage.bucket_name, self.datafile.name)
self.datafile.name = new_file_store_name
self.save()
else:
logger.error("Renaming datafile '%s' failed",
self.datafile.name)
return None
storage = SymlinkedFileSystemStorage()
new_file_store_name = storage.get_name(new_name)
new_file_store_path = storage.path(new_file_store_name)
try:
move_file(self.datafile.path, new_file_store_path)
except RuntimeError as exc:
logger.error("Renaming datafile '%s' failed: %s",
self.datafile.name, exc)
else:
self.datafile.name = new_file_store_name
self.save()
else:
logger.error("Datafile does not exist")
return None
logger.error(
"Error renaming datafile of FileStoreItem with UUID '%s': "
"datafile is not available", self.uuid
)

def get_datafile_url(self):
"""Returns relative or absolute URL of the datafile depending on file
Expand Down Expand Up @@ -269,31 +284,6 @@ def _delete_datafile(sender, instance, **kwargs):
instance.delete_datafile(save_instance=False)


def _rename_file_on_disk(current_path, new_path):
'''Rename a file using absolute paths, creating intermediate directories if
they don't exist.
:param current_path: Existing absolute file system path.
:type current_path: str.
:param new_path: New absolute file system path.
:type new_path: str.
:returns: True if renaming succeeded, False if failed.
'''
try:
os.renames(current_path, new_path)
except OSError as e:
logger.error(
"Error renaming file on disk\nOSError: [Errno %s], file name: %s, "
"error: %s. Current file name: %s. New file name: %s",
e.errno, e.filename, e.strerror, current_path, new_path
)
return False

logger.debug("Renamed %s to %s", current_path, new_path)
return True


def _get_extension_from_string(path):
"""Return file extension given a file name, file system path, or URL"""
file_name_parts = os.path.basename(path).split('.')
Expand Down
12 changes: 7 additions & 5 deletions refinery/file_store/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_name(self, name):
return self.get_available_name(get_valid_filename(name))


def copy_file_object(source, destination, progress_report):
def copy_file_object(source, destination, progress_report=lambda _: None):
"""Copy a file object and update progress"""
chunk_size = 10 * 1024 * 1024 # 10MB
logger.debug("Copying '%s' to '%s'", source.name, destination.name)
Expand All @@ -102,7 +102,7 @@ def copy_file_object(source, destination, progress_report):


def copy_s3_object(source_bucket, source_key, destination_bucket,
destination_key, progress_report):
destination_key, progress_report=lambda _: None):
"""Copy S3 object and update task progress"""
s3 = boto3.client('s3')
logger.debug("Started copying from 's3://%s/%s' to 's3://%s/%s'",
Expand Down Expand Up @@ -147,7 +147,8 @@ def delete_s3_object(bucket, key):
logger.info("Deleted 's3://%s/%s'", bucket, key)


def download_file_object(request_response, download_object, progress_report):
def download_file_object(request_response, download_object,
progress_report=lambda _: None):
"""Download file from request response object to a temporary file and
report progress"""
chunk_size = 10 * 1024 * 1024 # 10MB
Expand All @@ -166,7 +167,8 @@ def download_file_object(request_response, download_object, progress_report):
request_response.url, download_object.name)


def download_s3_object(bucket, key, download_object, progress_report):
def download_s3_object(bucket, key, download_object,
progress_report=lambda _: None):
"""Download object from S3 to a temp file and update task progress"""
s3 = boto3.client('s3')
logger.debug("Started downloading from 's3://%s/%s' to '%s'",
Expand Down Expand Up @@ -270,7 +272,7 @@ def symlink_file(source_path, link_path):
logger.info("Created symlink '%s' to '%s'", link_path, source_path)


def upload_file_object(source, bucket, key, progress_report):
def upload_file_object(source, bucket, key, progress_report=lambda _: None):
"""Upload file from source path to S3 bucket and report progress"""
s3 = boto3.client('s3')
logger.debug("Started uploading from '%s' to 's3://%s/%s'",
Expand Down

0 comments on commit 587fb66

Please sign in to comment.