Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: tv42/fs
base: master
...
head fork: tv42/fs
compare: py3k-compatibility
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 15 files changed
  • 0 commit comments
  • 1 contributor
Commits on Jul 05, 2009
Jyrki Pulliainen 2to3 automatic conversion 2f23dff
Jyrki Pulliainen Python 2to3: Refactor inmem storage to use io.BytesIO
_VirtualByteFile is a wrapper for io._BytesIO it enables individual
cursors for multiple instances. _VirtualTextFile is a replacement for
traditional text mode file. It provides wrapper functions, which
transfer inserted text to bytes ready for saving to _BytesIO.

There are still some limitations in this implementation:

- Only supported encoding is the platform specific encoding

- _VirtualByteFile works only with io._BytesIO, which is the pure
  Python implementation of io.BytesIO. io._BytesIO contains a special
  _buffer attribute which holds the inserted data. This _buffer can
  not be found in C-implementation of io.BytesIO
14fb89b
Jyrki Pulliainen Support custom encoding for inmem file system 1d2662f
View
27 filesystem/_base.py
@@ -27,11 +27,11 @@ class CrossDeviceRenameError(Exception):
## utility function for checking for safe file names.
## TODO: should it be encapsulated within a class?
def raise_on_insecure_file_name(name):
- if u'/' in name:
+ if '/' in name:
raise InsecurePathError(
'child name contains directory separator')
# this may be too naive
- if name == u'..':
+ if name == '..':
raise InsecurePathError(
'child trying to climb out of directory')
@@ -70,7 +70,7 @@ def exists(self):
"""
try:
self.stat()
- except OSError, e:
+ except OSError as e:
# ENOENT means the path wasn't found
if e.errno == errno.ENOENT:
return False
@@ -174,7 +174,7 @@ def __str__(self):
return str(self._pathname)
def __unicode__(self):
- return unicode(self._pathname)
+ return str(self._pathname)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._pathname)
@@ -191,7 +191,7 @@ def join(self, relpath):
The appended path has to be a relative path. Otherwise, an
``InsecurePathError`` is raised.
"""
- if relpath.startswith(u'/'):
+ if relpath.startswith('/'):
raise InsecurePathError('path name to join must be relative')
return self.__class__(os.path.join(self._pathname, relpath))
@@ -219,7 +219,13 @@ def child(self, *segments):
"""
p = self
for segment in segments:
- raise_on_insecure_file_name(segment)
+ if '/' in segment:
+ raise InsecurePathError(
+ 'child name contains directory separator')
+ # this may be too naive
+ if segment == '..':
+ raise InsecurePathError(
+ 'child trying to climb out of directory')
p = p.join(segment)
return p
@@ -330,13 +336,10 @@ def __le__(self, other):
def __ge__(self, other):
return not self < other
- def __unicode__(self):
- if self.parent() == self:
- return u'/'
- return str(self.parent())+ u'/' + self.name()
-
def __str__(self):
- return unicode(self).encode('utf8')
+ if self.parent() == self:
+ return '/'
+ return str(self.parent())+ '/' + self.name()
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, str(self))
View
6 filesystem/_localfs.py
@@ -51,7 +51,7 @@ def rename(self, new_path):
raise CrossDeviceRenameError()
try:
os.rename(self._pathname, new_path._pathname)
- except OSError, e:
+ except OSError as e:
if e.errno == errno.EXDEV:
raise CrossDeviceRenameError()
else:
@@ -135,7 +135,7 @@ def mkdir(self, may_exist=False, create_parents=False):
try:
os.mkdir(self._pathname)
- except OSError, e:
+ except OSError as e:
if may_exist and e.errno == errno.EEXIST:
pass
else:
@@ -144,6 +144,6 @@ def mkdir(self, may_exist=False, create_parents=False):
def rmdir(self):
os.rmdir(self._pathname)
-root = path(u'/')
+root = path('/')
## RFC: I want every path for every file system to have a root object for identification purposes.
path.root = root
View
112 filesystem/inmem.py
@@ -7,47 +7,72 @@
import filesystem
import stat
import posix
-import StringIO
+import io
import errno
+import sys
-class _VirtualFile(StringIO.StringIO):
+class _VirtualByteFile(io._BytesIO):
"""
- A StringIO class that is specialized to be used for the inmem fs.
+ A BytesIO class that is specialized to be used for the inmem fs.
"""
## This class is a wrapper class, the real content is stored in
## self._path._file, which we for the convenience copy to
## self._path. Except for self._path and self._file, we keep
## self.pos in this wrapper, since that's the only thing that is
## different between the different open files.
- def __init__(self, path, mode=u''):
- ## we'll have to use __dict__ here to get around __setattr__
- self.__dict__['_path'] = path
- self.__dict__['_file'] = path._file
- self.__dict__['pos'] = 0
-
- if mode.startswith(u'w'):
+
+ ## TODO: implement so that it works with cBytesIO too (ie.
+ ## io.BytesIO)
+ def __init__(self, path, mode='', *args, **kwargs):
+ super().__init__()
+ self._buffer = path._file._buffer
+
+ if 'encoding' in kwargs:
+ # TODO: Better argument passing is needed, since encoding
+ # can be a positional argument
+ self._encoding = kwargs['encoding']
+ else:
+ self._encoding = sys.getdefaultencoding()
+
+ if mode.startswith('w'):
self.truncate()
- if mode.startswith(u'a'):
- self.seek(self.len)
+ if mode.startswith('a'):
+ self.seek(path.size())
+
+class _VirtualTextFile(_VirtualByteFile):
+ """
+ A text-mode file wrapper class that is specialized to be used for
+ the inmem fs.
+ """
+ def __next__(self):
+ return self._btot(super().next())
+
+ def _ttob(self, text):
+ """Encode given text to a bytestring using given encoding (or
+ platform specific default, if no encoding is given.)
+ """
+ return bytes(text, self._encoding)
- def __getattr__(self, item):
- return getattr(self._file, item)
+ def _btot(self, bytes):
+ return bytes.decode(self._encoding)
- def __setattr__(self, item, newvalue):
- if item == 'pos':
- self.__dict__[item] = newvalue
- else:
- return setattr(self._file, item, newvalue)
+ def write(self, text):
+ text = self._ttob(text)
+ super().write(text)
- def __exit__(self, a, b, c):
- pass
+ def writelines(self, lines):
+ lines = (self._ttob(text) for text in lines)
+ super().writelines(lines)
- def close(self):
- pass
-
- def __enter__(self):
- return self
+ def read(self, size=None):
+ return self._btot(super().read(size))
+
+ def readline(self, size=None):
+ return self._btot(super().readline(size))
+
+ def readlines(self, sizehint=None):
+ return [self._btot(text) for text in super().readlines()]
class path(filesystem.WalkMixin, filesystem.StatWrappersMixin, filesystem.SimpleComparitionMixin):
"""
@@ -61,11 +86,11 @@ class path(filesystem.WalkMixin, filesystem.StatWrappersMixin, filesystem.Simple
creating a new distinct file system - the file system has to be
traversed through .child() or .join()
"""
- def __init__(self, name=u'', parent=None):
- if u'/' in name:
+ def __init__(self, name='', parent=None):
+ if '/' in name:
## TODO: untested code line
raise filesystem.InsecurePathError(
- u'use child() or join() to traverse the inmem fs')
+ 'use child() or join() to traverse the inmem fs')
if parent is None:
self._parent = self
@@ -74,11 +99,7 @@ def __init__(self, name=u'', parent=None):
self._name = name
self._children = {}
self._stat = ()
- self._file = StringIO.StringIO()
-
- #def __eq__(self, other):
- ## as said above, two equal paths should always be same object.
- #return self is other
+ self._file = io._BytesIO()
def stat(self):
if not self._stat:
@@ -86,9 +107,8 @@ def stat(self):
e.errno = errno.ENOENT
raise e
if self._file:
- self._stat[6] = self._file.len
+ self._stat[6] = len(self._file.getvalue())
## bloat ... turning a list to a sequence to a stat_result object ;-)
- ## Well, StringIO implementation is not quite optimal anyway ;-)
return posix.stat_result(list(self._stat))
def parent(self):
@@ -125,15 +145,15 @@ def rmdir(self):
self.unlink()
def join(self, relpath):
- if relpath.startswith(u'/'):
- raise filesystem.InsecurePathError(u'path name to join must be relative')
- return self.child(*relpath.split(u'/'))
+ if relpath.startswith('/'):
+ raise filesystem.InsecurePathError('path name to join must be relative')
+ return self.child(*relpath.split('/'))
def child(self, segment=None, *segments):
if not segment:
return self
- if not self._children.has_key(segment):
+ if segment not in self._children:
filesystem.raise_on_insecure_file_name(segment)
child = self.__class__(name=segment, parent=self)
self._children[segment] = child
@@ -145,10 +165,12 @@ def child(self, segment=None, *segments):
else:
return ret
- def open(self, mode=u'r', *args, **kwargs):
+ def open(self, mode='r', *args, **kwargs):
if not self.exists():
- self._stat = [stat.S_IFREG + 0777, 0,0,0,0,0,0,0,0,0]
- return _VirtualFile(self, mode)
+ self._stat = [stat.S_IFREG + 0o777, 0,0,0,0,0,0,0,0,0]
+ if 'b' in mode:
+ return _VirtualByteFile(self, mode, *args, **kwargs)
+ return _VirtualTextFile(self, mode, *args, **kwargs)
def mkdir(self, may_exist=False, create_parents=False):
## TODO: those lines are copied from _localfs.py, consider refactoring
@@ -167,7 +189,7 @@ def mkdir(self, may_exist=False, create_parents=False):
err.errno = errno.EEXIST
raise err
else:
- self._stat = [stat.S_IFDIR+0777, 0,0,0,0,0,0,0,0,0]
+ self._stat = [stat.S_IFDIR+0o777, 0,0,0,0,0,0,0,0,0]
def __iter__(self):
if not self.isdir():
@@ -177,7 +199,7 @@ def __iter__(self):
else:
e.errno = errno.ENOENT
raise e
- return [x for x in self._children.values() if x.exists()].__iter__()
+ return [x for x in list(self._children.values()) if x.exists()].__iter__()
root = path()
root.mkdir(may_exist=True, create_parents=True)
View
2  filesystem/multiplexing.py
@@ -101,7 +101,7 @@ def rename(self, new_path):
real_ancestor = _has_common_ancestor(self._bound, new_path._bound)
if not real_ancestor:
raise filesystem.CrossDeviceRenameError()
- if ancestor._bound <> real_ancestor:
+ if ancestor._bound != real_ancestor:
raise filesystem.CrossDeviceRenameError()
## TODO: all children are corrupt, since they are bound to
View
8 filesystem/test/test_child.py
@@ -9,8 +9,8 @@
import filesystem
def test_new_object():
- p = filesystem.path(u"/")
- c = p.child(u"segment1", u"segment2")
+ p = filesystem.path("/")
+ c = p.child("segment1", "segment2")
assert p is not c
- eq(unicode(p), u"/")
- eq(unicode(c), u"/segment1/segment2")
+ eq(str(p), "/")
+ eq(str(c), "/segment1/segment2")
View
2  filesystem/test/test_copyonwrite_iter.py
@@ -1,4 +1,4 @@
-from __future__ import with_statement
+
import tempfile
import os
View
2  filesystem/test/test_copyonwrite_mkdir.py
@@ -1,4 +1,4 @@
-from __future__ import with_statement
+
from nose.tools import (
eq_ as eq,
View
8 filesystem/test/test_copyonwrite_open.py
@@ -1,4 +1,4 @@
-from __future__ import with_statement
+
from nose.tools import (
eq_ as eq,
@@ -16,13 +16,13 @@
import filesystem.copyonwrite
def test_open_nonexisting():
- p = filesystem.copyonwrite.path(filesystem.path(u'/does-not-exist'))
+ p = filesystem.copyonwrite.path(filesystem.path('/does-not-exist'))
e = assert_raises(IOError, p.open)
eq(e.errno, errno.ENOENT)
def test_open_for_reading():
tmp = maketemp()
- foo = os.path.join(tmp, u'foo')
+ foo = os.path.join(tmp, 'foo')
# write file with Python's standard API ...
with open(foo, 'w') as f:
f.write('bar')
@@ -34,7 +34,7 @@ def test_open_for_reading():
def test_open_for_writing():
tmp = maketemp()
- foo = os.path.join(tmp, u'foo')
+ foo = os.path.join(tmp, 'foo')
# write test content
p = filesystem.copyonwrite.path(filesystem.path(foo))
assert_raises(IOError, open, foo)
View
2  filesystem/test/test_copyonwrite_rmdir.py
@@ -1,4 +1,4 @@
-from __future__ import with_statement
+
from nose.tools import (
eq_ as eq,
View
30 filesystem/test/test_localfs.py
@@ -14,32 +14,32 @@
## TODO: move those two tests to a misc test
def test_str_simple():
- p = filesystem.path(u'/foo')
+ p = filesystem.path('/foo')
got = str(p)
- eq(got, u'/foo')
+ eq(got, '/foo')
def test_str_root():
root = filesystem.root
slash = str(root)
- eq(slash, u'/')
+ eq(slash, '/')
def test_str_child_of_root():
- path = filesystem.root.child(u'tmp')
- eq(str(path), u'/tmp')
+ path = filesystem.root.child('tmp')
+ eq(str(path), '/tmp')
def test_join_without_slash():
"""
simple test of joining / and tmp
"""
- path = filesystem.root.join(u'tmp')
- eq(str(path), u'/tmp')
+ path = filesystem.root.join('tmp')
+ eq(str(path), '/tmp')
def test_join_with_leading_slash():
"""
join should raise an exception if one tries to escape from the
path by giving an absolute path
"""
- e = assert_raises(filesystem.InsecurePathError, filesystem.path(u'/tmp').join, u'/usr')
+ e = assert_raises(filesystem.InsecurePathError, filesystem.path('/tmp').join, '/usr')
eq(str(e), 'path name to join must be relative')
def test_join_with_trailing_slash():
@@ -48,14 +48,14 @@ def test_join_with_trailing_slash():
the parameters are with or without trailing slashes. join
should not discard the last trailing slash.
"""
- path = filesystem.root.join(u'usr/').join(u'lib').join(u'python/')
+ path = filesystem.root.join('usr/').join('lib').join('python/')
eq(str(path), "/usr/lib/python/")
def test_join_side_effects():
"""join should return a new object, and not modify the existing"""
path = filesystem.root
- ret = path.join(u'tmp')
- eq(str(path), u'/')
+ ret = path.join('tmp')
+ eq(str(path), '/')
assert path is not ret
def test_join_wrong_parameters():
@@ -67,7 +67,7 @@ def test_join_wrong_parameters():
def test_rename_bad_string():
tmp = maketemp()
parent = filesystem.path(tmp)
- old = parent.join(u'foo')
+ old = parent.join('foo')
assert_raises(
filesystem.CrossDeviceRenameError,
old.rename, 'foo')
@@ -92,13 +92,13 @@ def test_name_simple():
eq(filesystem.path("foo/bar").name(), "bar")
def test_open_nonexisting():
- p = filesystem.path(u'/does-not-exist')
+ p = filesystem.path('/does-not-exist')
e = assert_raises(IOError, p.open)
eq(e.errno, errno.ENOENT)
def test_open_for_reading():
tmp = maketemp()
- foo = os.path.join(tmp, u'foo')
+ foo = os.path.join(tmp, 'foo')
# write file with Python's standard API ...
with open(foo, 'w') as f:
f.write('bar')
@@ -110,7 +110,7 @@ def test_open_for_reading():
def test_open_for_writing():
tmp = maketemp()
- foo = os.path.join(tmp, u'foo')
+ foo = os.path.join(tmp, 'foo')
# write test content
p = filesystem.path(foo)
with p.open('w') as f:
View
10 filesystem/test/test_readlink.py
@@ -1,4 +1,4 @@
-from __future__ import with_statement
+
import os
import stat
import errno
@@ -12,11 +12,11 @@
def set_up(absolute):
temp_dir = maketemp()
os.chdir(temp_dir)
- source_name = os.path.join(temp_dir, u"link_source")
+ source_name = os.path.join(temp_dir, "link_source")
if absolute:
- target_name = os.path.join(temp_dir, u"link_target")
+ target_name = os.path.join(temp_dir, "link_target")
else:
- target_name = u"link_target"
+ target_name = "link_target"
# create the file the link will point to
f = open(target_name, "w")
f.close()
@@ -40,5 +40,5 @@ def test_readlink_on_regular_file():
assert_raises(OSError, p.readlink)
def test_readlink_on_nonexistent_file():
- p = filesystem.path(u"non-existent-file")
+ p = filesystem.path("non-existent-file")
assert_raises(OSError, p.readlink)
View
246 filesystem/test/test_roundtrip.py
@@ -5,7 +5,7 @@
any reasonably normal fs API implementation.
"""
-from __future__ import with_statement
+
import nose
@@ -31,12 +31,12 @@ class OperationsMixin(object):
# the actual tests; subclass this and provide a setUp method that
# gives it a empty self.path for every method
- def _create_file(self, name=u'foo', content='bar'):
+ def _create_file(self, name='foo', content='bar'):
"""
utility function - creates a file and writes some data to it.
"""
file = self.path.child(name)
- with file.open(u'w') as f:
+ with file.open('w') as f:
f.write(content)
return file
@@ -50,7 +50,7 @@ def test_file(self):
# set foo to a path object pointing to a new file with content "bar"
foo = self._create_file('foo', 'bar')
# test that the file can be accessed
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
assert(p.exists() is True)
assert(p.isfile() is True)
@@ -116,34 +116,34 @@ def test_open_read_write(self):
"""
## Make sure write-read is repeatable
for dummy in (0,1):
- p = self._create_file(u'foo', 'barfoo')
+ p = self._create_file('foo', 'barfoo')
with p.open() as f:
got = f.read(3)
- eq(got, u'bar')
+ eq(got, 'bar')
## assert that a newly opened file towards same
## path starts at the beginning of the file
with p.open() as f2:
got = f2.read(3)
- eq(got, u'bar')
+ eq(got, 'bar')
## assert that the previous read on another file
## descriptor won't affect the file position of f
got = f.read(3)
- eq(got, u'foo')
+ eq(got, 'foo')
## make sure read is repeatable also by accessing the file
## by name:
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
with p.open() as f:
got = f.read(3)
- eq(got, u'bar')
+ eq(got, 'bar')
def test_iter(self):
temp_files = ['file1', 'file2', 'file3']
# put some files in the temporary directory
for i in temp_files:
- f = self.path.child(i).open(u'w')
+ f = self.path.child(i).open('w')
f.close()
# see whether we actually get the file names with the iterator
files = sorted(x.name() for x in self.path)
@@ -151,13 +151,13 @@ def test_iter(self):
def test_not_directory(self):
# prepare a file on which to call the iterator
- f = self.path.child("some_file").open(u"w")
+ f = self.path.child("some_file").open("w")
f.close()
# check reaction on getting the iterator
p = self.path.child("some_file")
try:
iterator = iter(p)
- iterator.next()
+ next(iterator)
assert False ## TODO: better error handling?
except OSError:
pass
@@ -167,61 +167,61 @@ def test_child_no_segments(self):
assert got is self.path
def test_child_bad_slash(self):
- e = assert_raises(filesystem.InsecurePathError, self.path.child, u'ev/il')
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, 'ev/il')
eq(
str(e),
'child name contains directory separator',
)
## Exception should be raised even if it's not evil (?)
- e = assert_raises(filesystem.InsecurePathError, self.path.child, u'notsoevil/')
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, 'notsoevil/')
def test_child_bad_dotdot(self):
- e = assert_raises(filesystem.InsecurePathError, self.path.child, u'..')
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, '..')
eq(
str(e),
'child trying to climb out of directory',
)
## of course, those should also raise errors
- assert_raises(filesystem.InsecurePathError, self.path.child, u'../')
- assert_raises(filesystem.InsecurePathError, self.path.child, u'..//')
- assert_raises(filesystem.InsecurePathError, self.path.child, u'..//..')
+ assert_raises(filesystem.InsecurePathError, self.path.child, '../')
+ assert_raises(filesystem.InsecurePathError, self.path.child, '..//')
+ assert_raises(filesystem.InsecurePathError, self.path.child, '..//..')
def test_flush(self):
"""
Opens two files, writes to one of them, flushes, and asserts
the content can be read by the other file.
"""
- p = self.path.child(u'foo')
- with p.open(u'w') as fw:
+ p = self.path.child('foo')
+ with p.open('w') as fw:
with p.open() as fr:
fw.write('barfoo')
fw.flush()
got = fr.read(3)
- eq(got, u'bar')
+ eq(got, 'bar')
def test_flush_independent(self):
- p1 = self.path.child(u'foo')
- p2 = self.path.child(u'foo')
+ p1 = self.path.child('foo')
+ p2 = self.path.child('foo')
# most often p1 is not p2, but that's not required;
# however, this test is embarassingly easy for any
# fs where that is true..
- with p1.open(u'w') as fw:
+ with p1.open('w') as fw:
with p2.open() as fr:
fw.write('barfoo')
if hasattr(fw, 'flush'):
fw.flush()
got = fr.read(3)
- eq(got, u'bar')
+ eq(got, 'bar')
def test_append(self):
"""
Tests that appending to an existing file works
"""
- p = self.path.child(u'foo')
- with p.open(u'w') as f:
+ p = self.path.child('foo')
+ with p.open('w') as f:
f.write('foo')
- with p.open(u'a') as f:
+ with p.open('a') as f:
f.write('bar')
with p.open() as f:
got = f.read()
@@ -231,10 +231,10 @@ def test_overwrite(self):
"""
Tests that appending to an existing file works
"""
- p = self.path.child(u'foo')
- with p.open(u'w') as f:
+ p = self.path.child('foo')
+ with p.open('w') as f:
f.write('foo')
- with p.open(u'w') as f:
+ with p.open('w') as f:
pass
with p.open() as f:
got = f.read()
@@ -249,10 +249,10 @@ def test_size(self):
Test that will write a fixed length byte string to a file,
close it and check that the file size is correct.
"""
- bytestring = 'abcd' * 128
+ bytestring = b'abcd' * 128
eq(len(bytestring), 512)
- p = self.path.child(u'foo')
- with p.open(u'wb') as f:
+ p = self.path.child('foo')
+ with p.open('wb') as f:
f.write(bytestring)
filesize = p.size()
eq(filesize, 512)
@@ -260,86 +260,86 @@ def test_size(self):
eq(p.stat().st_size, 512)
def test_size_of_nonexisting_item(self):
- p = self.path.child(u"non-existent-item")
+ p = self.path.child("non-existent-item")
e = assert_raises(OSError, p.size)
eq(e.errno, errno.ENOENT)
def test_eq_positive(self):
- a = self.path.child(u'foo')
- b = self.path.child(u'foo')
+ a = self.path.child('foo')
+ b = self.path.child('foo')
eq(a, b)
def test_eq_negative(self):
- a = self.path.child(u'foo')
- b = self.path.child(u'bar')
- assert not a == b, u'%r should not equal %r' % (a, b)
+ a = self.path.child('foo')
+ b = self.path.child('bar')
+ assert not a == b, '%r should not equal %r' % (a, b)
def test_eq_weird(self):
- a = self.path.child(u'foo')
- b = u'foo'
- assert not a == b, u'%r should not equal %r' % (a, b)
+ a = self.path.child('foo')
+ b = 'foo'
+ assert not a == b, '%r should not equal %r' % (a, b)
def test_ne_positive(self):
- a = self.path.child(u'foo')
- b = self.path.child(u'bar')
+ a = self.path.child('foo')
+ b = self.path.child('bar')
ne(a, b)
def test_ne_negative(self):
- a = self.path.child(u'foo')
- b = self.path.child(u'foo')
- assert not a != b, u'%r should be equal to %r' % (a, b)
+ a = self.path.child('foo')
+ b = self.path.child('foo')
+ assert not a != b, '%r should be equal to %r' % (a, b)
def test_ne_weird(self):
- a = self.path.child(u'foo')
- b = u'foo'
- assert a != b, u'%r should not be equal to %r' % (a, b)
+ a = self.path.child('foo')
+ b = 'foo'
+ assert a != b, '%r should not be equal to %r' % (a, b)
def test_lt_positive(self):
- assert self.path.child(u'a') < self.path.child(u'b')
+ assert self.path.child('a') < self.path.child('b')
def test_lt_negative(self):
- assert not self.path.child(u'b') < self.path.child(u'a')
+ assert not self.path.child('b') < self.path.child('a')
def test_lt_negative_equal(self):
- assert not self.path.child(u'a') < self.path.child(u'a')
+ assert not self.path.child('a') < self.path.child('a')
def test_le_positive(self):
- assert self.path.child(u'a') <= self.path.child(u'b')
+ assert self.path.child('a') <= self.path.child('b')
def test_le_positive_equal(self):
- assert self.path.child(u'a') <= self.path.child(u'a')
+ assert self.path.child('a') <= self.path.child('a')
def test_le_negative(self):
- assert not self.path.child(u'b') <= self.path.child(u'a')
+ assert not self.path.child('b') <= self.path.child('a')
def test_gt_positive(self):
- assert self.path.child(u'b') > self.path.child(u'a')
+ assert self.path.child('b') > self.path.child('a')
def test_gt_negative(self):
- assert not self.path.child(u'a') > self.path.child(u'b')
+ assert not self.path.child('a') > self.path.child('b')
def test_gt_negative_equal(self):
- assert not self.path.child(u'a') > self.path.child(u'a')
+ assert not self.path.child('a') > self.path.child('a')
def test_ge_positive(self):
- assert self.path.child(u'b') >= self.path.child(u'a')
+ assert self.path.child('b') >= self.path.child('a')
def test_ge_positive_equal(self):
- assert self.path.child(u'a') >= self.path.child(u'a')
+ assert self.path.child('a') >= self.path.child('a')
def test_ge_negative(self):
- assert not self.path.child(u'a') >= self.path.child(u'b')
+ assert not self.path.child('a') >= self.path.child('b')
def test_parent(self):
- p = self.path.child(u'foo')
- c = p.child(u'bar')
+ p = self.path.child('foo')
+ c = p.child('bar')
eq(c.parent(), p)
def test_rename_simple(self):
- a = self.path.child(u'foo')
- with a.open(u'w') as f:
+ a = self.path.child('foo')
+ with a.open('w') as f:
f.write('bar')
- b = self.path.child(u'quux')
+ b = self.path.child('quux')
got = a.rename(b)
assert got is None, \
'Rename should not return anything, got: %r' % got
@@ -347,21 +347,21 @@ def test_rename_simple(self):
eq(a, b)
# create a new object, just in case a.rename did something
# weird to b
- c = self.path.child(u'quux')
+ c = self.path.child('quux')
eq(a, c)
with c.open() as f:
got = f.read()
- eq(got, u'bar')
- assert not self.path.child(u'foo').exists()
+ eq(got, 'bar')
+ assert not self.path.child('foo').exists()
def test_rename_overwrite(self):
- old = self.path.child(u'quux')
- with old.open(u'w') as f:
+ old = self.path.child('quux')
+ with old.open('w') as f:
f.write('old')
- a = self.path.child(u'foo')
- with a.open(u'w') as f:
+ a = self.path.child('foo')
+ with a.open('w') as f:
f.write('bar')
- b = self.path.child(u'quux')
+ b = self.path.child('quux')
got = a.rename(b)
assert got is None, \
'Rename should not return anything, got: %r' % got
@@ -369,34 +369,37 @@ def test_rename_overwrite(self):
eq(a, b)
# create a new object, just in case a.rename did something
# weird to b
- c = self.path.child(u'quux')
+ c = self.path.child('quux')
eq(a, c)
with c.open() as f:
got = f.read()
- eq(got, u'bar')
+ eq(got, 'bar')
def test_rename_dir(self):
a = self.path.child('foo')
a.mkdir()
- with a.child(u'thud').open(u'w') as f:
+ with a.child('thud').open('w') as f:
f.write('bar')
b = self.path.child('quux')
a.rename(b)
# create a new object, just in case a.rename did something
# weird to b
- c = self.path.child(u'quux')
+ c = self.path.child('quux')
with c.child('thud').open() as f:
got = f.read()
- eq(got, u'bar')
+ eq(got, 'bar')
def test_unlink_simple(self):
a = self._create_file()
+ a = self.path.child('foo')
+ with a.open('w') as f:
+ f.write('bar')
eq(list(self.path), [a])
a.unlink()
eq(list(self.path), [])
def test_unlink_notfound(self):
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
e = assert_raises(
OSError,
p.unlink,
@@ -410,7 +413,7 @@ def test_remove_simple(self):
eq(list(self.path), [])
def test_remove_notfound(self):
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
e = assert_raises(
OSError,
p.remove,
@@ -419,38 +422,38 @@ def test_remove_notfound(self):
def test_mkdir(self):
eq(list(self.path), [])
- self.path.child(u'foo').mkdir()
- eq(list(self.path), [self.path.child(u'foo')])
+ self.path.child('foo').mkdir()
+ eq(list(self.path), [self.path.child('foo')])
# create a new object, just in case .mkdir() stored something
# in p
- eq(self.path.child(u'foo').isdir(), True)
+ eq(self.path.child('foo').isdir(), True)
## if the dir already exists, an error should be raised
- e = assert_raises(OSError, self.path.child(u'foo').mkdir)
+ e = assert_raises(OSError, self.path.child('foo').mkdir)
eq(e.errno, errno.EEXIST)
## but one can ask for this error not to be raised
- p = self.path.child(u'foo').mkdir(may_exist=True)
+ p = self.path.child('foo').mkdir(may_exist=True)
## mkdir will raise errors if the parent dirs doesnu't exist
e = assert_raises(
- OSError, self.path.join(u'newdir/subdir1/subdir2').mkdir)
+ OSError, self.path.join('newdir/subdir1/subdir2').mkdir)
eq(e.errno, errno.ENOENT)
## but one can ask for this error not to be raised
- self.path.join(u'newdir/subdir1/subdir2').mkdir(create_parents=True)
- eq(self.path.join(u'newdir/subdir1/subdir2').isdir(), True)
+ self.path.join('newdir/subdir1/subdir2').mkdir(create_parents=True)
+ eq(self.path.join('newdir/subdir1/subdir2').isdir(), True)
def test_mkdir_bad_exists_file(self):
- with self.path.child(u'foo').open('w') as f:
+ with self.path.child('foo').open('w') as f:
f.write('FOO')
- e = assert_raises(OSError, self.path.child(u'foo').mkdir)
+ e = assert_raises(OSError, self.path.child('foo').mkdir)
eq(e.errno, errno.EEXIST)
def test_rmdir(self):
assert self.path.exists()
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
assert not p.exists()
p.mkdir()
assert p.exists()
@@ -459,10 +462,13 @@ def test_rmdir(self):
eq(list(self.path), [])
# create a new object, just in case .rmdir() stored something
# in p
- eq(self.path.child(u'foo').exists(), False)
+ eq(self.path.child('foo').exists(), False)
def test_rmdir_bad_notdir(self):
p = self._create_file()
+ p = self.path.child('foo')
+ with p.open('w') as f:
+ f.write('bar')
e = assert_raises(
OSError,
p.rmdir,
@@ -470,7 +476,7 @@ def test_rmdir_bad_notdir(self):
eq(e.errno, errno.ENOTDIR)
def test_rmdir_bad_notfound(self):
- p = self.path.child(u'foo')
+ p = self.path.child('foo')
e = assert_raises(
OSError,
p.rmdir,
@@ -479,12 +485,12 @@ def test_rmdir_bad_notfound(self):
def test_walk(self):
## File tree copied from /usr/lib/python2.5/test/test_os.py, class WalkTests
- sub1_path = self.path.join(u"SUB1")
- sub11_path = sub1_path.join(u"SUB11")
- sub2_path = self.path.join(u"SUB2")
- tmp1_path = self.path.join(u"tmp1")
- tmp2_path = sub1_path.join(u"tmp2")
- tmp3_path = sub2_path.join(u"tmp3")
+ sub1_path = self.path.join("SUB1")
+ sub11_path = sub1_path.join("SUB11")
+ sub2_path = self.path.join("SUB2")
+ tmp1_path = self.path.join("tmp1")
+ tmp2_path = sub1_path.join("tmp2")
+ tmp3_path = sub2_path.join("tmp3")
## Create dirs
sub11_path.mkdir(may_exist=True, create_parents=True)
@@ -501,7 +507,7 @@ def test_walk(self):
## Create some files
for path in tmp1_path, tmp2_path, tmp3_path:
- f = path.open(u'w')
+ f = path.open('w')
f.write("I'm %s and cloned from test_os. Blame test_roundtrip.py")
f.close()
@@ -582,27 +588,27 @@ def test_symlink_readlink_islink(self):
p = self.path
## create a mock file
- src = self._create_file(u'testfile', 'bar')
+ src = self._create_file('testfile', 'bar')
## create a symlink to it
- src.symlink(p.child(u'testlink'))
+ src.symlink(p.child('testlink'))
## assert that the link can be read
## TODO: readlink seems to return a string instead of a path object
## that's probably a bug?
- eq(str(p.child(u'testlink').readlink()), str(p.child(u'testfile')))
+ eq(str(p.child('testlink').readlink()), str(p.child('testfile')))
## assert that the file cannot be read as a link
- assert_raises(OSError, p.child(u'testfile').readlink)
+ assert_raises(OSError, p.child('testfile').readlink)
## assert that islink works
- assert(p.child(u'testfile').islink() is False)
- assert(p.child(u'testlink').islink() is True)
+ assert(p.child('testfile').islink() is False)
+ assert(p.child('testlink').islink() is True)
## assert that the link can be opened as a file,
## and that we'll get the file content by doing so
- with p.child(u'testlink').open(u'r') as f:
- eq(f.read(3), u'bar')
+ with p.child('testlink').open('r') as f:
+ eq(f.read(3), 'bar')
class PosixOpMixin(LinkOpMixin):
@@ -621,7 +627,7 @@ def test_methods_exists(self):
def _verify_stats(self, stats):
"""common tests from test_stat and test_lstat"""
## read mode for user ... is this reasonable to assert?
- assert(stats.st_mode & 0400)
+ assert(stats.st_mode & 0o400)
## testing st_ino and st_dev - they should exist
@@ -658,7 +664,7 @@ def _verify_stats(self, stats):
def test_stat(self):
## create a file to play with
- file = self._create_file(u'stat_test_file')
+ file = self._create_file('stat_test_file')
stats = file.stat()
## testing st_mode
@@ -675,8 +681,8 @@ def test_stat(self):
self._verify_stats(stats)
def test_lstat(self):
- link = self.path.child(u'stat_test_link')
- self.path.child(u'doesnexist').symlink(link)
+ link = self.path.child('stat_test_link')
+ self.path.child('doesnexist').symlink(link)
lstats = link.lstat()
assert(stat.S_ISLNK(lstats.st_mode))
## assert a "sane" size (is this sane?)
@@ -688,7 +694,7 @@ def test_lstat(self):
def test_chown_lchown_1(self):
## create a file and a link
- file = self._create_file(u'chown_test_file1')
+ file = self._create_file('chown_test_file1')
link = self.path.child('chown_test_link1')
file.symlink(link)
@@ -717,7 +723,7 @@ def test_chown_lchown_root(self):
raise nose.SkipTest("this test relies on user nobody to exist")
## setup - create a file and a link
- file = self._create_file(u'chown_test_file1')
+ file = self._create_file('chown_test_file1')
link = self.path.child('chown_test_link1')
file.symlink(link)
@@ -746,7 +752,7 @@ def test_chown_lchown_group(self):
if len(gids)<2:
raise SkipTest("need to be member of at least two groups to run this test")
## setup, part 2 - create a file and a link
- file = self._create_file(u'chown_test_file1')
+ file = self._create_file('chown_test_file1')
link = self.path.child('chown_test_link1')
file.symlink(link)
@@ -775,7 +781,7 @@ def _chk_grps(reverse=0):
def test_chown_lchown_raises_error(self):
## create a file and a link
- file = self._create_file(u'chown_test_file1')
+ file = self._create_file('chown_test_file1')
link = self.path.child('chown_test_link1')
file.symlink(link)
View
796 filesystem/test/test_roundtrip.py.bak
@@ -0,0 +1,796 @@
+"""
+Unit tests that set up and test operations through the API.
+
+The intent is that you could run these unit tests against
+any reasonably normal fs API implementation.
+"""
+
+
+
+import nose
+
+from nose.tools import (
+ eq_ as eq,
+ )
+
+from filesystem.test.util import (
+ ne,
+ assert_raises
+ )
+
+import filesystem
+
+import errno
+import stat
+import time
+import os
+import pwd
+import grp
+
+class OperationsMixin(object):
+ # the actual tests; subclass this and provide a setUp method that
+ # gives it a empty self.path for every method
+
+ def _create_file(self, name=u'foo', content='bar'):
+ """
+ utility function - creates a file and writes some data to it.
+ """
+ file = self.path.child(name)
+ with file.open(u'w') as f:
+ f.write(content)
+
+ return file
+
+
+ def test_file(self):
+ """
+ Test that writes to a file and assert that it's a file and not
+ a dir or link.
+ """
+ # set foo to a path object pointing to a new file with content "bar"
+ foo = self._create_file('foo', 'bar')
+ # test that the file can be accessed
+ p = self.path.child(u'foo')
+
+ assert(p.exists() is True)
+ assert(p.isfile() is True)
+ assert(p.isdir() is False)
+
+ assert(foo.exists() is True)
+ assert(foo.isfile() is True)
+ assert(foo.isdir() is False)
+
+ ## TODO: equality is not part of the mandatory API?
+ eq(p, foo)
+ if hasattr(p, 'islink'):
+ assert(p.islink() is False)
+ assert(foo.islink() is False)
+ if hasattr(p, 'stat'):
+ s = p.stat()
+ assert(stat.S_ISREG(s.st_mode) is True)
+ eq(foo.stat(), p.stat())
+
+
+ def test_stat_missing_file(self):
+ p = self.path.child('inexistent_file')
+ if hasattr(p, 'stat'):
+ e = assert_raises(OSError, p.stat)
+ eq(e.errno, errno.ENOENT)
+ assert(p.exists() is False)
+
+ def test_join_with_leading_slash(self):
+ """
+ join with a leading slash should normally throw an
+ InsecurePathError - though it may be differently implemented
+ from fs to fs.
+
+ Rationale for this test is just to excersise this condition;
+ there was a bug in the in-memory fs.
+ """
+ try:
+ self.path.join('/tmp')
+ except filesystem.InsecurePathError:
+ pass
+
+ def test_dir(self):
+ """
+ Test that takes our testing root dir and asserts it's a dir
+ and not a file. Copied from test_stat.py.
+ """
+ p = self.path
+ assert(p.exists() is True)
+ assert(p.isdir() is True)
+ assert(p.isfile() is False)
+ if hasattr(p, 'islink'):
+ assert(p.islink() is False)
+ if hasattr(p, 'stat'):
+ s = p.stat()
+ assert(stat.S_ISDIR(s.st_mode) is True)
+
+ def test_open_read_write(self):
+ """
+ This will attempt to write to a file and then read the same
+ file. The test also tests that two file handles won't share
+ file location, and that the same file can be written to
+ several times.
+ """
+ ## Make sure write-read is repeatable
+ for dummy in (0,1):
+ p = self._create_file(u'foo', 'barfoo')
+ with p.open() as f:
+ got = f.read(3)
+ eq(got, 'bar')
+
+ ## assert that a newly opened file towards same
+ ## path starts at the beginning of the file
+ with p.open() as f2:
+ got = f2.read(3)
+ eq(got, 'bar')
+
+ ## assert that the previous read on another file
+ ## descriptor won't affect the file position of f
+ got = f.read(3)
+ eq(got, 'foo')
+
+ ## make sure read is repeatable also by accessing the file
+ ## by name:
+ p = self.path.child('foo')
+ with p.open() as f:
+ got = f.read(3)
+ eq(got, 'bar')
+
+ def test_iter(self):
+ temp_files = ['file1', 'file2', 'file3']
+ # put some files in the temporary directory
+ for i in temp_files:
+ f = self.path.child(i).open('w')
+ f.close()
+ # see whether we actually get the file names with the iterator
+ files = sorted(x.name() for x in self.path)
+ eq(files, temp_files)
+
+ def test_not_directory(self):
+ # prepare a file on which to call the iterator
+ f = self.path.child("some_file").open("w")
+ f.close()
+ # check reaction on getting the iterator
+ p = self.path.child("some_file")
+ try:
+ iterator = iter(p)
+ next(iterator)
+ assert False ## TODO: better error handling?
+ except OSError:
+ pass
+
+ def test_child_no_segments(self):
+ got = self.path.child()
+ assert got is self.path
+
+ def test_child_bad_slash(self):
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, 'ev/il')
+ eq(
+ str(e),
+ 'child name contains directory separator',
+ )
+ ## Exception should be raised even if it's not evil (?)
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, 'notsoevil/')
+
+ def test_child_bad_dotdot(self):
+ e = assert_raises(filesystem.InsecurePathError, self.path.child, '..')
+ eq(
+ str(e),
+ 'child trying to climb out of directory',
+ )
+
+ ## of course, those should also raise errors
+ assert_raises(filesystem.InsecurePathError, self.path.child, '../')
+ assert_raises(filesystem.InsecurePathError, self.path.child, '..//')
+ assert_raises(filesystem.InsecurePathError, self.path.child, '..//..')
+
+ def test_flush(self):
+ """
+ Opens two files, writes to one of them, flushes, and asserts
+ the content can be read by the other file.
+ """
+ p = self.path.child('foo')
+ with p.open('w') as fw:
+ with p.open() as fr:
+ fw.write('barfoo')
+ fw.flush()
+ got = fr.read(3)
+ eq(got, 'bar')
+
+ def test_flush_independent(self):
+ p1 = self.path.child('foo')
+ p2 = self.path.child('foo')
+ # most often p1 is not p2, but that's not required;
+ # however, this test is embarassingly easy for any
+ # fs where that is true..
+ with p1.open('w') as fw:
+ with p2.open() as fr:
+ fw.write('barfoo')
+ if hasattr(fw, 'flush'):
+ fw.flush()
+ got = fr.read(3)
+ eq(got, 'bar')
+
+ def test_append(self):
+ """
+ Tests that appending to an existing file works
+ """
+ p = self.path.child('foo')
+ with p.open('w') as f:
+ f.write('foo')
+ with p.open('a') as f:
+ f.write('bar')
+ with p.open() as f:
+ got = f.read()
+ eq(got, 'foobar')
+
+ def test_overwrite(self):
+ """
+ Tests that appending to an existing file works
+ """
+ p = self.path.child('foo')
+ with p.open('w') as f:
+ f.write('foo')
+ with p.open('w') as f:
+ pass
+ with p.open() as f:
+ got = f.read()
+ eq(got, '')
+
+ # some implementation might have p1 and p2 be the same object, but
+ # that is not required, so identity is not tested in either
+ # direction
+
+ def test_size(self):
+ """
+ Test that will write a fixed length byte string to a file,
+ close it and check that the file size is correct.
+ """
+ bytestring = 'abcd' * 128
+ eq(len(bytestring), 512)
+ p = self.path.child('foo')
+ with p.open('wb') as f:
+ f.write(bytestring)
+ filesize = p.size()
+ eq(filesize, 512)
+ if hasattr(p, 'stat'):
+ eq(p.stat().st_size, 512)
+
+ def test_size_of_nonexisting_item(self):
+ p = self.path.child("non-existent-item")
+ e = assert_raises(OSError, p.size)
+ eq(e.errno, errno.ENOENT)
+
+ def test_eq_positive(self):
+ a = self.path.child('foo')
+ b = self.path.child('foo')
+ eq(a, b)
+
+ def test_eq_negative(self):
+ a = self.path.child('foo')
+ b = self.path.child('bar')
+ assert not a == b, '%r should not equal %r' % (a, b)
+
+ def test_eq_weird(self):
+ a = self.path.child('foo')
+ b = 'foo'
+ assert not a == b, '%r should not equal %r' % (a, b)
+
+ def test_ne_positive(self):
+ a = self.path.child('foo')
+ b = self.path.child('bar')
+ ne(a, b)
+
+ def test_ne_negative(self):
+ a = self.path.child('foo')
+ b = self.path.child('foo')
+ assert not a != b, '%r should be equal to %r' % (a, b)
+
+ def test_ne_weird(self):
+ a = self.path.child('foo')
+ b = 'foo'
+ assert a != b, '%r should not be equal to %r' % (a, b)
+
+ def test_lt_positive(self):
+ assert self.path.child('a') < self.path.child('b')
+
+ def test_lt_negative(self):
+ assert not self.path.child('b') < self.path.child('a')
+
+ def test_lt_negative_equal(self):
+ assert not self.path.child('a') < self.path.child('a')
+
+ def test_le_positive(self):
+ assert self.path.child('a') <= self.path.child('b')
+
+ def test_le_positive_equal(self):
+ assert self.path.child('a') <= self.path.child('a')
+
+ def test_le_negative(self):
+ assert not self.path.child('b') <= self.path.child('a')
+
+ def test_gt_positive(self):
+ assert self.path.child('b') > self.path.child('a')
+
+ def test_gt_negative(self):
+ assert not self.path.child('a') > self.path.child('b')
+
+ def test_gt_negative_equal(self):
+ assert not self.path.child('a') > self.path.child('a')
+
+ def test_ge_positive(self):
+ assert self.path.child('b') >= self.path.child('a')
+
+ def test_ge_positive_equal(self):
+ assert self.path.child('a') >= self.path.child('a')
+
+ def test_ge_negative(self):
+ assert not self.path.child('a') >= self.path.child('b')
+
+ def test_parent(self):
+ p = self.path.child('foo')
+ c = p.child('bar')
+ eq(c.parent(), p)
+
+ def test_rename_simple(self):
+ a = self.path.child('foo')
+ with a.open('w') as f:
+ f.write('bar')
+ b = self.path.child('quux')
+ got = a.rename(b)
+ assert got is None, \
+ 'Rename should not return anything, got: %r' % got
+ # a should have mutated to equal b
+ eq(a, b)
+ # create a new object, just in case a.rename did something
+ # weird to b
+ c = self.path.child('quux')
+ eq(a, c)
+ with c.open() as f:
+ got = f.read()
+ eq(got, 'bar')
+ assert not self.path.child('foo').exists()
+
+ def test_rename_overwrite(self):
+ old = self.path.child('quux')
+ with old.open('w') as f:
+ f.write('old')
+ a = self.path.child('foo')
+ with a.open('w') as f:
+ f.write('bar')
+ b = self.path.child('quux')
+ got = a.rename(b)
+ assert got is None, \
+ 'Rename should not return anything, got: %r' % got
+ # a should have mutated to equal b
+ eq(a, b)
+ # create a new object, just in case a.rename did something
+ # weird to b
+ c = self.path.child('quux')
+ eq(a, c)
+ with c.open() as f:
+ got = f.read()
+ eq(got, 'bar')
+
+ def test_rename_dir(self):
+ a = self.path.child('foo')
+ a.mkdir()
+ with a.child('thud').open('w') as f:
+ f.write('bar')
+ b = self.path.child('quux')
+ a.rename(b)
+ # create a new object, just in case a.rename did something
+ # weird to b
+ c = self.path.child('quux')
+ with c.child('thud').open() as f:
+ got = f.read()
+ eq(got, 'bar')
+
+ def test_unlink_simple(self):
+ a = self._create_file()
+ a = self.path.child('foo')
+ with a.open('w') as f:
+ f.write('bar')
+ eq(list(self.path), [a])
+ a.unlink()
+ eq(list(self.path), [])
+
+ def test_unlink_notfound(self):
+ p = self.path.child('foo')
+ e = assert_raises(
+ OSError,
+ p.unlink,
+ )
+ eq(e.errno, errno.ENOENT)
+
+ def test_remove_simple(self):
+ a = self._create_file()
+ eq(list(self.path), [a])
+ a.remove()
+ eq(list(self.path), [])
+
+ def test_remove_notfound(self):
+ p = self.path.child('foo')
+ e = assert_raises(
+ OSError,
+ p.remove,
+ )
+ eq(e.errno, errno.ENOENT)
+
+ def test_mkdir(self):
+ eq(list(self.path), [])
+ self.path.child('foo').mkdir()
+ eq(list(self.path), [self.path.child('foo')])
+ # create a new object, just in case .mkdir() stored something
+ # in p
+ eq(self.path.child('foo').isdir(), True)
+
+ ## if the dir already exists, an error should be raised
+ e = assert_raises(OSError, self.path.child('foo').mkdir)
+ eq(e.errno, errno.EEXIST)
+
+
+ ## but one can ask for this error not to be raised
+ p = self.path.child('foo').mkdir(may_exist=True)
+
+ ## mkdir will raise errors if the parent dirs doesnu't exist
+ e = assert_raises(
+ OSError, self.path.join('newdir/subdir1/subdir2').mkdir)
+ eq(e.errno, errno.ENOENT)
+
+ ## but one can ask for this error not to be raised
+ self.path.join('newdir/subdir1/subdir2').mkdir(create_parents=True)
+ eq(self.path.join('newdir/subdir1/subdir2').isdir(), True)
+
+ def test_mkdir_bad_exists_file(self):
+ with self.path.child('foo').open('w') as f:
+ f.write('FOO')
+ e = assert_raises(OSError, self.path.child('foo').mkdir)
+ eq(e.errno, errno.EEXIST)
+
+ def test_rmdir(self):
+ assert self.path.exists()
+ p = self.path.child('foo')
+ assert not p.exists()
+ p.mkdir()
+ assert p.exists()
+ p.rmdir()
+ assert not p.exists()
+ eq(list(self.path), [])
+ # create a new object, just in case .rmdir() stored something
+ # in p
+ eq(self.path.child('foo').exists(), False)
+
+ def test_rmdir_bad_notdir(self):
+ p = self._create_file()
+ p = self.path.child('foo')
+ with p.open('w') as f:
+ f.write('bar')
+ e = assert_raises(
+ OSError,
+ p.rmdir,
+ )
+ eq(e.errno, errno.ENOTDIR)
+
+ def test_rmdir_bad_notfound(self):
+ p = self.path.child('foo')
+ e = assert_raises(
+ OSError,
+ p.rmdir,
+ )
+ eq(e.errno, errno.ENOENT)
+
+ def test_walk(self):
+ ## File tree copied from /usr/lib/python2.5/test/test_os.py, class WalkTests
+ sub1_path = self.path.join("SUB1")
+ sub11_path = sub1_path.join("SUB11")
+ sub2_path = self.path.join("SUB2")
+ tmp1_path = self.path.join("tmp1")
+ tmp2_path = sub1_path.join("tmp2")
+ tmp3_path = sub2_path.join("tmp3")
+
+ ## Create dirs
+ sub11_path.mkdir(may_exist=True, create_parents=True)
+ sub2_path.mkdir(may_exist=True, create_parents=True)
+
+ ## First test, walk on an empty dir should return a list with one item
+ empty_dir_walk = list(sub11_path.walk())
+ eq(len(empty_dir_walk), 1)
+ ## Each list item is a three-element tuple just like with os.path:
+ ## (directory, subdirlist, filelist)
+ eq(empty_dir_walk[0][0], sub11_path)
+ eq(empty_dir_walk[0][1], [])
+ eq(empty_dir_walk[0][2], [])
+
+ ## Create some files
+ for path in tmp1_path, tmp2_path, tmp3_path:
+ f = path.open('w')
+ f.write("I'm %s and cloned from test_os. Blame test_roundtrip.py")
+ f.close()
+
+ ## Walk top-down
+ all = list(self.path.walk())
+
+ ## all should now be a list of four tuples, one for each
+ ## non-empty directory encountered. It can either be [self.path, SUB1, SUB11, SUB2]
+ ## or [self.path, SUB2, SUB1, SUB11].
+ ## TODO: refactor this test code, it should check the full return structure
+ eq(len(all), 4)
+ dir_list = [path_tuple[0].name() for path_tuple in all]
+ expected1 = [self.path.name(), 'SUB1', 'SUB11', 'SUB2']
+ expected2 = [self.path.name(), 'SUB2', 'SUB1', 'SUB11']
+ assert dir_list in (expected1, expected2)
+
+ ## FIRST in the list is self.path.
+ ## First element in the self.path-tuple is self.path
+ eq(all[0][0], self.path)
+
+ ## Second element is subdirlist - but the ordering is not guaranteed
+ subdirs_returned = list(all[0][1])
+ subdirs_expected = [sub1_path, sub2_path]
+ subdirs_returned.sort()
+ subdirs_expected.sort()
+ eq(subdirs_returned, subdirs_expected)
+
+ ## third element is the file list
+ eq(all[0][2], [tmp1_path])
+
+ ## TODO: the list is not checked completely yet. Not sure how
+ ## to write "clean" test code. In test_os an attribute
+ ## "flipped" is created to indicate the order sub1 and sub2
+ ## came out.
+
+ ## Prune the search.
+ all = []
+ for root, dirs, files in self.path.walk():
+ all.append((root, dirs, files))
+ # Don't descend into SUB1.
+ if sub1_path in dirs:
+ # Note that this also mutates the dirs we appended to all!
+ dirs.remove(sub1_path)
+ eq(len(all), 2)
+ eq(all[0], (self.path, [sub2_path], [tmp1_path]))
+ eq(all[1], (sub2_path, [], [tmp3_path]))
+
+ ## Inject out-of-tree path in dirs
+ try:
+ for (root, dirs, files) in self.path.walk():
+ eq(root, self.path) ## we shouldn't recurse into parent
+ dirs[0] = self.path.parent()
+ except filesystem.InsecurePathError:
+ pass
+
+
+ # Walk bottom-up.
+ all = list(self.path.walk(topdown=False))
+ eq(len(all), 4)
+ dir_list = [path_tuple[0].name() for path_tuple in all]
+ expected1 = ['SUB11', 'SUB1', 'SUB2', self.path.name()]
+ expected2 = ['SUB2', 'SUB11', 'SUB1', self.path.name()]
+ assert dir_list in (expected1, expected2)
+
+ ## TODO: we should do a complete test, just the reverse of
+ ## further above - and it should be refactored to avoid
+ ## duplicated code.
+
+
+class LinkOpMixin(OperationsMixin):
+ """
+ Test suite for file system objects that implements symlinks.
+
+ Subclass this and provide a setUp method that
+ gives it a empty self.path for every method
+ """
+ def test_symlink_readlink_islink(self):
+ p = self.path
+
+ ## create a mock file
+ src = self._create_file(u'testfile', 'bar')
+
+ ## create a symlink to it
+ src.symlink(p.child('testlink'))
+
+ ## assert that the link can be read
+ ## TODO: readlink seems to return a string instead of a path object
+ ## that's probably a bug?
+ eq(str(p.child('testlink').readlink()), str(p.child('testfile')))
+
+ ## assert that the file cannot be read as a link
+ assert_raises(OSError, p.child('testfile').readlink)
+
+ ## assert that islink works
+ assert(p.child('testfile').islink() is False)
+ assert(p.child('testlink').islink() is True)
+
+ ## assert that the link can be opened as a file,
+ ## and that we'll get the file content by doing so
+ with p.child('testlink').open('r') as f:
+ eq(f.read(3), 'bar')
+
+
+class PosixOpMixin(LinkOpMixin):
+ """
+ Mixin that tests all the stuff a posix file system should implement,
+ including ordinary file handling, symlinks, etc
+ (TODO - not complete)
+ """
+ def test_methods_exists(self):
+ for method in ('stat', 'lstat', 'readlink', 'symlink',
+ 'islink', 'chown'):
+ ## TODO: add .... 'chmod', 'lchown', 'access', 'major', 'minor', 'makedev', 'mkfifo', 'mknod', 'utime')
+ assert(hasattr(self.path, method))
+ assert(hasattr(getattr(self.path, method), '__call__'))
+
+ def _verify_stats(self, stats):
+ """common tests from test_stat and test_lstat"""
+ ## read mode for user ... is this reasonable to assert?
+ assert(stats.st_mode & 0o400)
+
+
+ ## testing st_ino and st_dev - they should exist
+ stats.st_ino
+ stats.st_dev
+
+ ## number of hard links to this inode
+ eq(stats.st_nlink, 1)
+
+ ## ownership should match our efficient uid
+ eq(stats.st_uid, os.geteuid())
+ ## don't assume too much about the gid through
+ stats.st_gid
+
+ ## this assert is a bit unstable - of course the clock can be
+ ## modified while we're running tests, or we could be paused
+ ## or running pdb or whatever ... but 12 seconds ought to be
+ ## sufficient.
+ assert(time.time()-stats.st_mtime<12)
+ assert(time.time()-stats.st_ctime<12)
+ ## is this a sane assert?
+ eq(stats.st_mtime, stats.st_ctime)
+
+ ## I wouldn't like to add sleeps into the test ...
+ ## but we ought to test whether st_mtime, st_ctime and st_atime is
+ ## updated correctly. Well, let's do that in the utime test ...
+
+ ## the stat should be available both as a dict and a list. I
+ ## think the positions and number of elements are fixed in the
+ ## standard.
+ eq(len(stats), 10)
+ eq(stats[0], stats.st_mode)
+ eq(stats[6], stats.st_size)
+
+ def test_stat(self):
+ ## create a file to play with
+ file = self._create_file(u'stat_test_file')
+ stats = file.stat()
+
+ ## testing st_mode
+ assert(stat.S_ISREG(stats.st_mode))
+ assert(file.isfile())
+
+ ## size should be 3
+ eq(stats.st_size, file.size())
+ eq(stats.st_size, 3)
+
+ ## for a non-link, lstat and stat should yield same result
+ eq(stats, file.lstat())
+
+ self._verify_stats(stats)
+
+ def test_lstat(self):
+ link = self.path.child('stat_test_link')
+ self.path.child('doesnexist').symlink(link)
+ lstats = link.lstat()
+ assert(stat.S_ISLNK(lstats.st_mode))
+ ## assert a "sane" size (is this sane?)
+ assert(lstats.st_size>0 and lstats.st_size<1024)
+
+ self._verify_stats(lstats)
+
+ assert_raises(OSError, link.stat)
+
+ def test_chown_lchown_1(self):
+ ## create a file and a link
+ file = self._create_file(u'chown_test_file1')
+ link = self.path.child('chown_test_link1')
+ file.symlink(link)
+
+ ## chown should accept both numeric and string arguments
+ ## ... batteries included!
+ ## gid is optional
+
+ ## should pass without problems, changing nothing
+ ## (assuming group ownership is the same for the file and the link)
+ for chown in (file.chown, file.lchown, link.chown, link.lchown):
+ chown(os.geteuid())
+ chown(os.getlogin())
+ chown(new_group=file.stat().st_gid)
+ chown(new_group=grp.getgrgid(file.stat().st_gid).gr_name)
+ chown(os.geteuid(), file.stat().st_gid)
+ chown(os.getlogin(), grp.getgrgid(file.stat().st_gid).gr_name)
+
+ def test_chown_lchown_root(self):
+ ## this test has some requirements ...
+ if os.geteuid():
+ raise nose.SkipTest("this test can only be run as root ... :/")
+
+ try:
+ pwd.getpwnam('nobody')
+ except KeyError:
+ raise nose.SkipTest("this test relies on user nobody to exist")
+
+ ## setup - create a file and a link
+ file = self._create_file(u'chown_test_file1')
+ link = self.path.child('chown_test_link1')
+ file.symlink(link)
+
+ ## Hm ... TODO: fix this neater, copy logics from the test_chown_lchown_group test
+ link.lchown(0)
+ link.chown('nobody')
+ eq(file.stat().st_uid, pwd.getpwnam('nobody').pw_uid)
+ eq(link.stat().st_uid, pwd.getpwnam('nobody').pw_uid)
+ eq(link.lstat().st_uid, 0)
+ link.chown(0)
+ eq(file.stat().st_uid, 0)
+ file.chown('nobody')
+ eq(file.stat().st_uid, pwd.getpwnam('nobody').pw_uid)
+ file.chown(0)
+ eq(file.stat().st_uid, 0)
+ link.lchown('nobody')
+ eq(file.stat().st_uid, 0)
+ eq(link.stat().st_uid, 0)
+ eq(link.lstat().st_uid, pwd.getpwnam('nobody').pw_uid)
+
+ def test_chown_lchown_group(self):
+ """user should be able to change group of file/link"""
+
+ ## setup part 1, find two user group ids we can use
+ gids = os.getgroups()
+ if len(gids)<2:
+ raise SkipTest("need to be member of at least two groups to run this test")
+ ## setup, part 2 - create a file and a link
+ file = self._create_file(u'chown_test_file1')
+ link = self.path.child('chown_test_link1')
+ file.symlink(link)
+
+ ## helper function - check that file has gid0 and link has
+ ## gid1 or opposite
+ def _chk_grps(reverse=0):
+ eq(file.stat().st_gid, gids[reverse])
+ eq(link.stat().st_gid, gids[reverse])
+ eq(link.lstat().st_gid, gids[not reverse])
+
+ uid = os.geteuid()
+
+ ## setting file to gid0 and link to gid1
+ file.lchown(uid, gids[0])
+ link.lchown(uid, gids[1])
+ _chk_grps()
+ ## reversing
+ file.chown(uid, gids[1])
+ link.lchown(uid, gids[0])
+ _chk_grps(1)
+ ## setting link to gid0 and file to gid1
+ link.chown(uid, gids[0])
+ link.lchown(uid, gids[1])
+ _chk_grps(0)
+
+
+ def test_chown_lchown_raises_error(self):
+ ## create a file and a link
+ file = self._create_file(u'chown_test_file1')
+ link = self.path.child('chown_test_link1')
+ file.symlink(link)
+
+ if not os.geteuid():
+ raise nose.SkipTest("this test can not be run as root")
+
+ ## should give permission denied
+ assert_raises(OSError, file.chown, 0)
+ assert_raises(OSError, file.lchown, 0)
+
+ ## TODO: write tests for all the other posix-required methods...
+
View
8 filesystem/test/test_unicode.py
@@ -13,14 +13,14 @@
def test_path_str_argument():
p = filesystem.path("/")
assert isinstance(str(p), str)
- assert isinstance(unicode(p), unicode)
+ assert isinstance(str(p), str)
def test_path_unicode_argument():
- p = filesystem.path(u"/")
+ p = filesystem.path("/")
+ assert isinstance(str(p), str)
assert isinstance(str(p), str)
- assert isinstance(unicode(p), unicode)
def test_repr():
- p = filesystem.path(u"/test\xe4")
+ p = filesystem.path("/test\xe4")
assert isinstance(repr(p), str)
View
6 filesystem/test/util.py
@@ -6,7 +6,7 @@
def maybe_mkdir(*a, **kw):
try:
os.mkdir(*a, **kw)
- except OSError, e:
+ except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
@@ -61,7 +61,7 @@ def maketemp():
tmp = os.path.join(tmp, name)
try:
shutil.rmtree(tmp)
- except OSError, e:
+ except OSError as e:
if e.errno == errno.ENOENT:
pass
else:
@@ -75,7 +75,7 @@ def assert_raises(excClass, callableObj, *args, **kwargs):
"""
try:
callableObj(*args, **kwargs)
- except excClass, e:
+ except excClass as e:
return e
else:
if hasattr(excClass,'__name__'): excName = excClass.__name__

No commit comments for this range

Something went wrong with that request. Please try again.