Skip to content

Commit

Permalink
Merge pull request aws#51 from kyleknap/special-files
Browse files Browse the repository at this point in the history
Add support for downloading to special files
  • Loading branch information
kyleknap committed Sep 13, 2016
2 parents 906cc4c + fc10eb5 commit 91b09c0
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-Download-21108.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Download",
"type": "bugfix",
"description": "Add support for downloading to special UNIX file by name"
}
51 changes: 40 additions & 11 deletions s3transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ def __init__(self, osutil, transfer_coordinator, io_executor):
self._io_executor = io_executor

@classmethod
def is_compatible(cls, download_target):
def is_compatible(cls, download_target, osutil):
"""Determines if the target for the download is compatible with manager
:param download_target: The target for which the upload will write
data to.
:param osutil: The os utility to be used for the transfer
:returns: True if the manager can handle the type of target specified
otherwise returns False.
"""
Expand Down Expand Up @@ -138,7 +140,7 @@ def __init__(self, osutil, transfer_coordinator, io_executor):
self._temp_fileobj = None

@classmethod
def is_compatible(cls, download_target):
def is_compatible(cls, download_target, osutil):
return isinstance(download_target, six.string_types)

def get_fileobj_for_io_writes(self, transfer_future):
Expand All @@ -163,18 +165,41 @@ def get_final_io_task(self):
)

def _get_temp_fileobj(self):
f = self._osutil.open(self._temp_filename, 'wb')
f = self._get_fileobj(self._temp_filename)
self._transfer_coordinator.add_failure_cleanup(
self._osutil.remove_file, self._temp_filename)
return f

def _get_fileobj(self, filename):
f = self._osutil.open(filename, 'wb')
# Make sure the file gets closed and we remove the temporary file
# if anything goes wrong during the process.
self._transfer_coordinator.add_failure_cleanup(f.close)
self._transfer_coordinator.add_failure_cleanup(
self._osutil.remove_file, self._temp_filename)
return f


class DownloadSpecialFilenameOutputManager(DownloadFilenameOutputManager):
"""Handles special files that cannot use a temporary file for io writes"""
@classmethod
def is_compatible(cls, download_target, osutil):
return super(DownloadSpecialFilenameOutputManager, cls).is_compatible(
download_target, osutil) and \
osutil.is_special_file(download_target)

def get_fileobj_for_io_writes(self, transfer_future):
fileobj = transfer_future.meta.call_args.fileobj
return self._get_fileobj(fileobj)

def get_final_io_task(self):
# This task will serve the purpose of signaling when all of the io
# writes have finished so done callbacks can be called.
return CompleteDownloadNOOPTask(
transfer_coordinator=self._transfer_coordinator)


class DownloadSeekableOutputManager(DownloadOutputManager):
@classmethod
def is_compatible(cls, download_target):
def is_compatible(cls, download_target, osutil):
return seekable(download_target)

def get_fileobj_for_io_writes(self, transfer_future):
Expand All @@ -199,7 +224,7 @@ def __init__(self, osutil, transfer_coordinator, io_executor,
self._io_submit_lock = threading.Lock()

@classmethod
def is_compatible(cls, download_target):
def is_compatible(cls, download_target, osutil):
return hasattr(download_target, 'write')

def get_download_task_tag(self):
Expand Down Expand Up @@ -234,25 +259,29 @@ def queue_file_io_task(self, fileobj, data, offset):
class DownloadSubmissionTask(SubmissionTask):
"""Task for submitting tasks to execute a download"""

def _get_download_output_manager_cls(self, transfer_future):
def _get_download_output_manager_cls(self, transfer_future, osutil):
"""Retrieves a class for managing output for a download
:type transfer_future: s3transfer.futures.TransferFuture
:param transfer_future: The transfer future for the request
:type osutil: s3transfer.utils.OSUtils
:param osutil: The os utility associated to the transfer
:rtype: class of DownloadOutputManager
:returns: The appropriate class to use for managing a specific type of
input for downloads.
"""
download_manager_resolver_chain = [
DownloadSpecialFilenameOutputManager,
DownloadFilenameOutputManager,
DownloadSeekableOutputManager,
DownloadNonSeekableOutputManager,
]

fileobj = transfer_future.meta.call_args.fileobj
for download_manager_cls in download_manager_resolver_chain:
if download_manager_cls.is_compatible(fileobj):
if download_manager_cls.is_compatible(fileobj, osutil):
return download_manager_cls
raise RuntimeError(
'Output %s of type: %s is not supported.' % (
Expand Down Expand Up @@ -294,8 +323,8 @@ def _submit(self, client, config, osutil, request_executor, io_executor,
response['ContentLength'])

download_output_manager = self._get_download_output_manager_cls(
transfer_future)(osutil, self._transfer_coordinator,
io_executor)
transfer_future, osutil)(osutil, self._transfer_coordinator,
io_executor)

# If it is greater than threshold do a ranged download, otherwise
# do a regular GetObject download.
Expand Down
30 changes: 30 additions & 0 deletions s3transfer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import functools
import math
import os
import stat
import string
import logging
import threading
Expand Down Expand Up @@ -243,6 +244,35 @@ def remove_file(self, filename):
def rename_file(self, current_filename, new_filename):
rename_file(current_filename, new_filename)

def is_special_file(cls, filename):
"""Checks to see if a file is a special UNIX file.
It checks if the file is a character special device, block special
device, FIFO, or socket.
:param filename: Name of the file
:returns: True if the file is a special file. False, if is not.
"""
# If it does not exist, it must be a new file so it cannot be
# a special file.
if not os.path.exists(filename):
return False
mode = os.stat(filename).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# FIFO.
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False


class DeferredOpenFile(object):
OPEN_METHOD = open
Expand Down
17 changes: 17 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import hashlib
import math
import os
import platform
import shutil
import string
import tempfile
Expand Down Expand Up @@ -65,6 +66,22 @@ def random_bucket_name(prefix='s3transfer', num_chars=10):
return prefix + ''.join([base[b % len(base)] for b in random_bytes])


def skip_if_windows(reason):
"""Decorator to skip tests that should not be run on windows.
Example usage:
@skip_if_windows("Not valid")
def test_some_non_windows_stuff(self):
self.assertEqual(...)
"""
def decorator(func):
return unittest.skipIf(
platform.system() not in ['Darwin', 'Linux'], reason)(func)
return decorator


class StreamWithError(object):
"""A wrapper to simulate errors while reading from a stream
Expand Down
43 changes: 43 additions & 0 deletions tests/functional/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import shutil
import stat
import socket
import tempfile

from tests import unittest
from tests import skip_if_windows
from s3transfer.utils import OSUtils


@skip_if_windows('Windows does not support UNIX special files')
class TestOSUtilsSpecialFiles(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.filename = os.path.join(self.tempdir, 'myfile')

def tearDown(self):
shutil.rmtree(self.tempdir)

def test_character_device(self):
self.assertTrue(OSUtils().is_special_file('/dev/null'))

def test_fifo(self):
os.mkfifo(self.filename)
self.assertTrue(OSUtils().is_special_file(self.filename))

def test_socket(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.filename)
self.assertTrue(OSUtils().is_special_file(self.filename))
16 changes: 16 additions & 0 deletions tests/integration/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from concurrent.futures import CancelledError

from tests import assert_files_equal
from tests import skip_if_windows
from tests import RecordingSubscriber
from tests import NonSeekableWriter
from tests.integration import BaseTransferManagerIntegTest
Expand Down Expand Up @@ -165,3 +166,18 @@ def test_above_threshold_for_nonseekable_fileobj(self):
self.bucket_name, '20mb.txt', NonSeekableWriter(f))
future.result()
assert_files_equal(filename, download_path)

@skip_if_windows('Windows does not support UNIX special files')
def test_download_to_special_file(self):
transfer_manager = self.create_transfer_manager(self.config)
filename = self.files.create_file_with_size(
'foo.txt', filesize=1024 * 1024)
self.upload_file(filename, '1mb.txt')
future = transfer_manager.download(
self.bucket_name, '1mb.txt', '/dev/null')
try:
future.result()
except Exception as e:
self.fail(
'Should have been able to download to /dev/null but received '
'following exception %s' % e)
Loading

0 comments on commit 91b09c0

Please sign in to comment.