Skip to content

Commit

Permalink
Hit 100% test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
invisibleroads committed Oct 29, 2020
1 parent f3d7799 commit b142edc
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 58 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 1.1
- Restore archive_safely, unarchive_safely
- Restore check_relative_path, get_relative_path
- Restore has_extension, is_matching_paths, walk_paths

# 1.0
- Restore TemporaryStorage, make_folder, make_unique_folder
- Restore remove_folder, remove_path
Expand Down
1 change: 1 addition & 0 deletions invisibleroads_macros_disk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
ARCHIVE_ZIP_EXTENSIONS,
TEMPORARY_FOLDER)
from .exceptions import (
BadArchiveError,
FileExtensionError,
InvisibleRoadsMacrosDiskError,
PathValidationError)
Expand Down
4 changes: 4 additions & 0 deletions invisibleroads_macros_disk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ class PathValidationError(InvisibleRoadsMacrosDiskError):

class FileExtensionError(InvisibleRoadsMacrosDiskError):
pass


class BadArchiveError(InvisibleRoadsMacrosDiskError):
pass
37 changes: 24 additions & 13 deletions invisibleroads_macros_disk/operations.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import re
import tarfile
from invisibleroads_macros_security import make_random_string
from os import makedirs, remove, unlink
from os.path import islink, join, splitext
from shutil import rmtree, unpack_archive
from os import makedirs, remove
from os.path import isdir, islink, join, splitext
from shutil import rmtree
from tempfile import mkdtemp
from zipfile import ZipFile, ZIP_DEFLATED
from zipfile import BadZipfile, ZipFile, ZIP_DEFLATED

from .constants import (
ARCHIVE_TAR_EXTENSIONS,
ARCHIVE_ZIP_EXTENSIONS,
TEMPORARY_FOLDER)
from .exceptions import (
BadArchiveError,
FileExtensionError)
from .resolutions import (
get_relative_path,
Expand Down Expand Up @@ -59,7 +60,7 @@ def archive_zip_safely(source_folder, target_path=None, excluded_paths=None):
with ZipFile(
target_path, 'w', ZIP_DEFLATED, allowZip64=True,
) as target_file:
_archive_folder(source_folder, excluded_paths, target_file.write)
_compress_folder(source_folder, excluded_paths, target_file.write)
return target_path


Expand All @@ -75,27 +76,37 @@ def archive_tar_safely(source_folder, target_path=None, excluded_paths=None):
with tarfile.open(
target_path, 'w:' + compression_format, dereference=False,
) as target_file:
_archive_folder(source_folder, excluded_paths, target_file.add)
_compress_folder(source_folder, excluded_paths, target_file.add)
return target_path


def unarchive_safely(source_path, target_folder=None):
'Unarchive folder without symbolic links'
if has_extension(source_path, ARCHIVE_ZIP_EXTENSIONS):
try:
source_file = ZipFile(source_path, 'r')
except BadZipfile:
raise BadArchiveError({'source_path': 'is unreadable'})
extension_expression = r'\.zip$'
items = [_ for _ in source_file.infolist() if (
_.external_attr >> 28) != 0xA]
elif has_extension(source_path, ARCHIVE_TAR_EXTENSIONS):
compression_format = splitext(source_path)[1].lstrip('.')
try:
source_file = tarfile.open(source_path, 'r:' + compression_format)
except tarfile.ReadError:
raise BadArchiveError({'source_path': 'is unreadable'})
extension_expression = r'\.tar\.%s$' % compression_format
items = [_ for _ in source_file.getmembers() if not _.issym()]
else:
archive_extensions = ARCHIVE_ZIP_EXTENSIONS + ARCHIVE_TAR_EXTENSIONS
raise FileExtensionError({
'source_path': 'must end in ' + ' or '.join(archive_extensions)})
if not target_folder:
target_folder = re.sub(extension_expression, '', source_path)
unpack_archive(source_path, target_folder)
for path in walk_paths(target_folder):
if islink(path):
unlink(path)
for item in items:
source_file.extract(item, target_folder)
source_file.close()
return target_folder


Expand Down Expand Up @@ -152,11 +163,11 @@ def remove_path(path):
return path


def _archive_folder(source_folder, excluded_paths, archive_path):
def _compress_folder(source_folder, excluded_paths, compress_path):
for source_path in walk_paths(source_folder):
if islink(source_path):
if isdir(source_path) or islink(source_path):
continue
if is_matching_path(source_path, excluded_paths or []):
continue
target_path = get_relative_path(source_path, source_folder)
archive_path(source_path, target_path)
compress_path(source_path, target_path)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

setup(
name='invisibleroads-macros-disk',
version='1.0.3',
version='1.1.0',
description='Shortcut functions for disk operations',
long_description=DESCRIPTION,
long_description_content_type='text/markdown',
Expand Down
Binary file added tests/examples.zip
Binary file not shown.
114 changes: 70 additions & 44 deletions tests/test_operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import tarfile
from invisibleroads_macros_disk import (
BadArchiveError,
FileExtensionError,
TemporaryStorage,
archive_safely,
archive_tar_safely,
Expand All @@ -10,47 +13,94 @@
remove_path,
unarchive_safely,
ARCHIVE_TAR_EXTENSIONS,
ARCHIVE_ZIP_EXTENSIONS,
FileExtensionError)
ARCHIVE_ZIP_EXTENSIONS)
from invisibleroads_macros_security import ALPHABET
from os.path import basename, exists, islink, join
from os.path import basename, exists, join
from pytest import raises
from shutil import make_archive
from tempfile import mkstemp
from zipfile import ZipFile

from conftest import A_FOLDER, B_FOLDER, FILE_CONTENT, FILE_NAME
from conftest import EXAMPLES_FOLDER, FILE_CONTENT, FILE_NAME


def test_archive_safely(tmpdir):
check_archive_functionality(
archive_safely, tmpdir,
bad_extension='.x',
good_extension=ARCHIVE_TAR_EXTENSIONS[0])
source_folder = tmpdir.mkdir('x')
source_folder.join(FILE_NAME).write(FILE_CONTENT)
source_folder.join(FILE_NAME + '_').write(FILE_CONTENT)
source_folder = source_folder.strpath

archive_path = archive_safely(source_folder, excluded_paths=['*.txt'])
archive_file = ZipFile(archive_path)
file_paths = archive_file.namelist()
assert FILE_NAME not in file_paths
assert FILE_NAME + '_' in file_paths

archive_path = archive_safely(EXAMPLES_FOLDER, tmpdir.join(
'examples.tar.gz').strpath)
archive_file = tarfile.open(archive_path, 'r:gz')
file_paths = archive_file.getnames()
assert join('a', FILE_NAME) in file_paths
assert join('b', FILE_NAME) not in file_paths

with raises(FileExtensionError):
archive_safely(source_folder, source_folder + '.x')


def test_archive_zip_safely(tmpdir):
check_archive_functionality(
archive_zip_safely, tmpdir,
bad_extension=ARCHIVE_TAR_EXTENSIONS[0],
good_extension=ARCHIVE_ZIP_EXTENSIONS[0])
source_folder = tmpdir.mkdir('x')
source_folder.join(FILE_NAME).write(FILE_CONTENT)
source_folder = source_folder.strpath

archive_path = archive_zip_safely(source_folder)
archive_file = ZipFile(archive_path)
assert FILE_NAME in archive_file.namelist()

with raises(FileExtensionError):
archive_zip_safely(
source_folder, source_folder + ARCHIVE_TAR_EXTENSIONS[0])


def test_archive_tar_safely(tmpdir):
check_archive_functionality(
archive_tar_safely, tmpdir,
bad_extension=ARCHIVE_ZIP_EXTENSIONS[0],
good_extension=ARCHIVE_TAR_EXTENSIONS[0])
source_folder = tmpdir.mkdir('x')
source_folder.join(FILE_NAME).write(FILE_CONTENT)
source_folder = source_folder.strpath

archive_path = archive_tar_safely(source_folder)
archive_file = tarfile.open(archive_path)
assert FILE_NAME in archive_file.getnames()

with raises(FileExtensionError):
archive_tar_safely(
source_folder, source_folder + ARCHIVE_ZIP_EXTENSIONS[0])


def test_unarchive_safely(tmpdir):
with raises(FileExtensionError):
unarchive_safely(tmpdir.join('x.x').strpath)
archive_basename = tmpdir.join('b').strpath
make_archive(archive_basename, 'gztar', B_FOLDER)
with raises(BadArchiveError):
archive_path_object = tmpdir.join('x.zip')
archive_path_object.write(FILE_CONTENT)
unarchive_safely(archive_path_object.strpath)
with raises(BadArchiveError):
archive_path_object = tmpdir.join('x.tar.gz')
archive_path_object.write(FILE_CONTENT)
unarchive_safely(archive_path_object.strpath)

archive_path = EXAMPLES_FOLDER + '.zip'
archive_file = ZipFile(archive_path)
assert 'b/file.txt' in archive_file.namelist()
archive_folder = unarchive_safely(archive_path, tmpdir.join(
'examples').strpath)
assert open(join(archive_folder, 'a', FILE_NAME)).read() == FILE_CONTENT
assert not exists(join(archive_folder, 'b', FILE_NAME))

archive_basename = tmpdir.join('examples').strpath
make_archive(archive_basename, 'gztar', EXAMPLES_FOLDER)
archive_path = archive_basename + '.tar.gz'
archive_folder = unarchive_safely(archive_path)
assert islink(join(B_FOLDER, FILE_NAME))
assert not exists(join(archive_folder, FILE_NAME))
assert open(join(archive_folder, 'a', FILE_NAME)).read() == FILE_CONTENT
assert not exists(join(archive_folder, 'b', FILE_NAME))


def test_temporary_storage():
Expand Down Expand Up @@ -82,27 +132,3 @@ def test_remove_path():
temporary_path = mkstemp()[1]
remove_path(temporary_path)
remove_path(temporary_path)


def check_archive_functionality(
archive, tmpdir, bad_extension, good_extension):
source_folder = tmpdir.mkdir('x')
source_folder.join(FILE_NAME).write(FILE_CONTENT)
source_folder = source_folder.strpath

with raises(FileExtensionError):
archive(source_folder, source_folder + bad_extension)

archive_path = archive(source_folder)
archive_folder = unarchive_safely(archive_path)
assert open(join(archive_folder, FILE_NAME)).read() == FILE_CONTENT

archive_path = archive(A_FOLDER, tmpdir.join(
'a' + good_extension).strpath, excluded_paths=['*.txt'])
archive_folder = unarchive_safely(archive_path)
assert not exists(join(archive_folder, FILE_NAME))

archive_path = archive(B_FOLDER, tmpdir.join(
'b' + good_extension).strpath)
archive_folder = unarchive_safely(archive_path)
assert not exists(join(archive_folder, FILE_NAME))

0 comments on commit b142edc

Please sign in to comment.