Skip to content

Commit

Permalink
tempdir tests and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jdidion committed Nov 3, 2016
1 parent 85cdac3 commit d9267dc
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 81 deletions.
64 changes: 40 additions & 24 deletions tests/testpaths.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,59 @@
from . import *

class TempDirTests(TestCase):
def test_descriptor(self):
with self.assertRaises(ValueError):
TempPathDescriptor(path_type='d', contents='foo')
with TempDir(mode='rwx') as temp:
f = temp.make_file(name='foo', mode=None)
self.assertTrue('foo' in temp)
self.assertTrue(temp[f].exists)
self.assertEqual('foo', temp[f].relative_path)
self.assertEqual(os.path.join(temp.absolute_path, 'foo'), temp[f].absolute_path)
self.assertEqual('rwx', temp[f].mode)
temp[f].set_access('r')
with self.assertRaises(PermissionError):
open(f, 'w')

def test_context_manager(self):
with TempDir() as temp:
with open(temp.make_file(name='foo'), 'wt') as o:
o.write('foo')
self.assertFalse(os.path.exists(temp.path))
self.assertFalse(os.path.exists(temp.absolute_path))

def test_dir(self):
temp = TempDir()
foo = temp.make_directory(name='foo')
self.assertEqual(foo, os.path.join(temp.path, 'foo'))
bar = temp.make_directory(name='bar', subdir=foo)
self.assertEqual(bar, os.path.join(temp.path, 'foo', 'bar'))
self.assertTrue(os.path.exists(os.path.join(temp.path, 'foo', 'bar')))
self.assertEqual(foo, os.path.join(temp.absolute_path, 'foo'))
bar = temp.make_directory(name='bar', parent=foo)
self.assertEqual(bar, os.path.join(temp.absolute_path, 'foo', 'bar'))
self.assertTrue(os.path.exists(os.path.join(temp.absolute_path, 'foo', 'bar')))
temp.close()
self.assertFalse(os.path.exists(temp.path))
self.assertFalse(os.path.exists(temp.absolute_path))

def test_tree(self):
temp = TempDir()
foo = temp.make_directory(name='foo')
bar = temp.make_directory(name='bar', subdir=foo)
f = temp.make_file(name='baz', subdir=bar)
self.assertEqual(f, os.path.join(temp.path, 'foo', 'bar', 'baz'))
bar = temp.make_directory(name='bar', parent=foo)
f = temp.make_file(name='baz', parent=bar)
self.assertEqual(f, os.path.join(temp.absolute_path, 'foo', 'bar', 'baz'))
temp.close()
self.assertFalse(os.path.exists(f))

# def test_mode(self):
# with TempDir('r') as temp:
# # Raises error because the tempdir is read-only
# with self.assertRaises(PermissionError):
# f = temp.get_file('bar', 'foo', True)
#
#
# check_access(f, 'r')
# with self.assertRaises(IOError):
# check_access(f, 'w')
# with self.assertRaises(PermissionError):
# f.write('foo')
def test_mode(self):
with TempDir('r') as temp:
# Raises error because the tempdir is read-only
with self.assertRaises(PermissionError):
temp.make_file(name='bar')
# Should be able to create the tempdir with existing read-only files
with TempDir('r', [TempPathDescriptor(name='foo', contents='foo')]) as d:
self.assertTrue(os.path.exists(d.absolute_path))
self.assertTrue(os.path.exists(os.path.join(d.absolute_path, 'foo')))
with open(os.path.join(d.absolute_path, 'foo'), 'rt') as i:
self.assertEqual('foo', i.read())

def test_fifo(self):
pass

class PathTests(TestCase):
def setUp(self):
Expand Down Expand Up @@ -120,7 +136,7 @@ def test_resolve_file(self):

def test_resolve_with_parent(self):
self.root.make_directory(name='foo')
path = self.root.make_file(subdir='foo')
path = self.root.make_file(parent='foo')
name = os.path.basename(path)
parent = os.path.dirname(path)
self.assertEqual(path, resolve_path(name, parent))
Expand Down Expand Up @@ -173,8 +189,8 @@ def test_safe_checks(self):

def test_find(self):
level1 = self.root.make_directory()
level2 = self.root.make_directory(prefix='foo', subdir=level1)
paths = self.root.make_empty_files(3, prefix='bar', subdir=level2)
level2 = self.root.make_directory(prefix='foo', parent=level1)
paths = self.root.make_empty_files(3, prefix='bar', parent=level2)
x = find(level1, 'foo.*', 'd')
self.assertEqual(1, len(x))
self.assertEqual(level2, x[0])
Expand Down
146 changes: 89 additions & 57 deletions xphyle/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,72 @@ def check_executable(fpath):

# tempfiles

class TempPathDescriptor(object):
class TempPath(object):
def __init__(self, parent=None, mode='rwx', path_type='d'):
self.parent = parent
self.path_type = path_type
self._mode = mode

@property
def mode(self):
"""Get the access mode of the path. Defaults to the parent's mode.
"""
if not self._mode:
if self.parent:
self._mode = self.parent.mode
else:
raise Exception("Cannot determine mode without 'parent'")
return self._mode

def set_access(self, mode=None, set_parent=False, additive=False):
"""Set the access mode for the path.
Args:
mode: The new mode to set. If node, the existing mode is used
set_parent: Whether to recursively set the mode of all parents.
This is done additively.
additive: Whether permissions should be additive (e.g.
if ``mode == 'w'`` and ``self.mode == 'r'``, the new mode
is 'rw')
"""
if not self.exists:
return None
if mode:
if additive:
self._mode = ''.join(set(self.mode) | set(mode))
else:
self._mode = mode
else:
mode = self.mode
if set_parent and self.parent:
self.parent.set_access(mode, True, True)
# Always set 'x' on directories, otherwise they are not listable
if self.path_type in ('d', 'dir') and 'x' not in mode:
mode += 'x'
set_access(self.absolute_path, mode)
return mode

class TempPathDescriptor(TempPath):
"""Describes a temporary file or directory within a TempDir.
Args:
name: The file/direcotry name
subdir: A TempPathDescriptor
parent: The parent directory, a TempPathDescriptor
mode: The access mode
suffix, prefix: The suffix and prefix to use when calling
``mkstemp`` or ``mkdtemp``
path_type: 'f' or 'file' (for file), 'd' or 'dir' (for directory),
or 'fifo' (for FIFO)
"""
def __init__(self, name=None, subdir=None, mode=None,
def __init__(self, name=None, parent=None, mode=None,
suffix='', prefix='', contents='', path_type='f'):
if contents and path_type != 'f':
raise ValueError("'contents' only valid for files")
self.path_type = path_type
self.subdir = subdir
super(TempPathDescriptor, self).__init__(parent, mode, path_type)
self.name = name
self.prefix = prefix
self.suffix = suffix
self.contents = contents
self.root = None
self._mode = mode
self._abspath = None
self._relpath = None
Expand All @@ -330,48 +378,27 @@ def relative_path(self):
self._init_path()
return self._relpath

@property
def mode(self):
if not self._mode:
if not self.root:
raise Exception("Cannot determine mode without 'root'")
self._mode = self.root.mode
return self._mode

def _init_path(self):
if self.root is None:
if self.parent is None:
raise Exception("Cannot determine absolute path without 'root'")
if self.subdir:
self._relpath = os.path.join(self.subdir.relative_path, self.name)
else:
self._relpath = self.name
self._abspath = os.path.join(self.root.path, self._relpath)

def set_root(self, tempdir):
self.root = tempdir
if self.mode is None:
self.mode = tempdir.mode
self._relpath = os.path.join(self.parent.relative_path, self.name)
self._abspath = os.path.join(self.parent.absolute_path, self.name)

def set_access(self, mode=None):
if mode:
self.mode = mode
if self.root is None or self.mode is None:
raise Exception("Both 'root' and 'mode' must be set before setting "
"access permissions")
set_access(self.absolute_path, self.mode)

def create(self):
if self.path_type in ('d', 'dir'):
os.mkdir(self.absolute_path)
else:
def create(self, apply_permissions=True):
if self.path_type not in ('d', 'dir'):
if self.path_type == 'fifo':
if os.path.exists(self.absolute_path):
os.remove(self.absolute_path)
os.mkfifo(self.absolute_path)

if self.contents or self.path_type != 'fifo':
with open(self.absolute_path, 'wt') as fh:
fh.write(self.contents or '')
elif not os.path.exists(self.absolute_path):
os.mkdir(self.absolute_path)
if apply_permissions:
self.set_access()

class TempDir(object):
class TempDir(TempPath):
"""Context manager that creates a temporary directory and cleans it up
upon exit.
Expand All @@ -387,13 +414,14 @@ class TempDir(object):
system.
"""
def __init__(self, mode='rwx', path_descriptors=None, **kwargs):
self.path = abspath(tempfile.mkdtemp(**kwargs))
self.mode = mode
super(TempDir, self).__init__(mode=mode)
self.absolute_path = abspath(tempfile.mkdtemp(**kwargs))
self.relative_path = ''
self.paths = {}
if path_descriptors:
self.make_paths(*path_descriptors)
set_access(self.path, mode)

self.set_access()
def __enter__(self):
return self

Expand All @@ -405,11 +433,20 @@ def __getitem__(self, path):

def __contains__(self, path):
return path in self.paths


@property
def exists(self):
return os.path.exists(self.absolute_path)

def close(self):
"""Delete the temporary directory and all files/subdirectories within.
"""
shutil.rmtree(self.path)
# First need to make all paths removable
if not self.exists:
return
for path in self.paths.values():
path.set_access('rwx', True)
shutil.rmtree(self.absolute_path)

def make_path(self, desc=None, apply_permissions=True, **kwargs):
"""Create a file or directory within the TempDir.
Expand All @@ -428,29 +465,24 @@ def make_path(self, desc=None, apply_permissions=True, **kwargs):
desc = TempPathDescriptor(**kwargs)

# If the subdirectory is given as a path, resolve it
if isinstance(desc.subdir, str):
desc.subdir = self[desc.subdir]
if not desc.parent:
desc.parent = self
elif isinstance(desc.parent, str):
desc.parent = self[desc.parent]

# Determine the name of the new file/directory
create = True
if not desc.name:
parent = desc.subdir.absolute_path if desc.subdir else self.path
parent = desc.parent.absolute_path
if desc.path_type in ('d', 'dir'):
path = tempfile.mkdtemp(
prefix=desc.prefix, suffix=desc.suffix, dir=parent)
desc.name = os.path.basename(path)
# mkdtemp creates the directory for us
create = False
else:
path = tempfile.mkstemp(
prefix=desc.prefix, suffix=desc.suffix, dir=parent)[1]
desc.name = os.path.basename(path)

desc.set_root(self)
if create:
desc.create()
if apply_permissions:
desc.set_access()
desc.create(apply_permissions)

self.paths[desc.absolute_path] = desc
self.paths[desc.relative_path] = desc
Expand Down Expand Up @@ -482,7 +514,7 @@ def make_paths(self, *path_descriptors):
for desc in path_descriptors]
# Now apply permissions after all paths are created
for desc in path_descriptors:
desc.set_access()
result = desc.set_access()
return paths

def make_empty_files(self, n, **kwargs):
Expand Down

0 comments on commit d9267dc

Please sign in to comment.