Skip to content

Commit

Permalink
100% coverage for ftp/tests/demofs.py
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Oct 26, 2017
1 parent f00a314 commit c112422
Showing 1 changed file with 34 additions and 44 deletions.
78 changes: 34 additions & 44 deletions src/zope/server/ftp/tests/demofs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@

class File(object):
type = 'f'
modified=None
modified = None

def __init__(self):
self.access = {'anonymous': read}

def accessable(self, user, access=read):
return (user == 'root'
or (self.access.get(user, 0) & access)
or (self.access.get('anonymous', 0) & access)
)
return (
user == 'root'
or (self.access.get(user, 0) & access)
or (self.access.get('anonymous', 0) & access)
)

def grant(self, user, access):
self.access[user] = self.access.get(user, 0) | access
Expand All @@ -57,7 +58,7 @@ def __getitem__(self, name):
def __setitem__(self, name, v):
self.files[name] = v

def __delitem__(self, name):
def __delitem__(self, name): # pragma: no cover
del self.files[name]

def __contains__(self, name):
Expand Down Expand Up @@ -86,10 +87,8 @@ def get(self, path, default=None):
d = self.files
if path:
for name in path.split('/'):
if d.type is not 'd':
return default
if not d.accessable(self.user):
raise Unauthorized
assert d.type == 'd'
assert d.accessable(self.user)
d = d.get(name)
if d is None:
break
Expand All @@ -105,13 +104,13 @@ def getany(self, path):
def getdir(self, path):
d = self.getany(path)
if d.type != 'd':
raise OSError("Not a directory:", path)
raise AssertionError("Not a directory:", path)
return d

def getfile(self, path):
d = self.getany(path)
if d.type != 'f':
raise OSError("Not a file:", path)
raise AssertionError("Not a file:", path)
return d

def getwdir(self, path):
Expand All @@ -128,9 +127,7 @@ def type(self, path):
def names(self, path, filter=None):
"See zope.server.interfaces.ftp.IFileSystem"
f = list(self.getdir(path))
if filter is not None:
f = [name for name in f if filter(name)]

assert filter is None
return f

def _lsinfo(self, name, file):
Expand All @@ -142,22 +139,18 @@ def _lsinfo(self, name, file):
}
if file.type == 'f':
info['size'] = len(file.data)
if file.modified is not None:
info['mtime'] = file.modified
assert file.modified is None

return info

def ls(self, path, filter=None):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getdir(path)
if filter is None:
return [self._lsinfo(name, f.files[name])
for name in f
]

return [self._lsinfo(name, f.files[name])
for name in f
if filter(name)]
assert filter is None
return [
self._lsinfo(name, f.files[name])
for name in f
]

def readfile(self, path, outstream, start=0, end=None):
"See zope.server.interfaces.ftp.IFileSystem"
Expand All @@ -178,20 +171,18 @@ def lsinfo(self, path):

def mtime(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getany(path)
return f.modified
raise AssertionError("Not implemented or called")

def size(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getany(path)
return len(getattr(f, 'data', b''))
raise AssertionError("Not implemented or called")

def mkdir(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getwdir(path)
if name in d.files:
raise OSError("Already exists:", name)
raise AssertionError("Already exists:", name)
newdir = self.Directory()
newdir.grant(self.user, read | write)
d.files[name] = newdir
Expand All @@ -201,21 +192,21 @@ def remove(self, path):
path, name = posixpath.split(path)
d = self.getwdir(path)
if name not in d.files:
raise OSError("Not exists:", name)
raise AssertionError("Not exists:", name)
f = d.files[name]
if f.type == 'd':
raise OSError('Is a directory:', name)
raise AssertionError('Is a directory:', name)
del d.files[name]

def rmdir(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getwdir(path)
if name not in d.files:
raise OSError("Not exists:", name)
raise AssertionError("Not exists:", name)
f = d.files[name]
if f.type != 'd':
raise OSError('Is not a directory:', name)
raise AssertionError('Is not a directory:', name)
del d.files[name]

def rename(self, old, new):
Expand All @@ -227,9 +218,9 @@ def rename(self, old, new):
newdir = self.getwdir(newpath)

if oldname not in olddir.files:
raise OSError("Not exists:", oldname)
raise AssertionError("Not exists:", oldname)
if newname in newdir.files:
raise OSError("Already exists:", newname)
raise AssertionError("Already exists:", newname)

newdir.files[newname] = olddir.files[oldname]
del olddir.files[oldname]
Expand All @@ -244,32 +235,31 @@ def writefile(self, path, instream, start=None, end=None, append=False):
f = d.files[name] = self.File()
f.grant(self.user, read | write)
elif f.type != 'f':
raise OSError("Can't overwrite a directory")
raise AssertionError("Can't overwrite a directory")

if not f.accessable(self.user, write):
raise OSError("Permission denied")
raise AssertionError("Permission denied")

if append:
f.data += instream.read()
else:

if start:
if start < 0:
raise ValueError("Negative starting file position")
raise AssertionError("Negative starting file position")
prefix = f.data[:start]
if len(prefix) < start:
prefix += b'\0' * (start - len(prefix))
assert len(prefix) >= start
else:
prefix = b''
start=0
start = 0

if end:
if end < 0:
raise ValueError("Negative ending file position")
raise AssertionError("Negative ending file position")
l = end - start
newdata = instream.read(l)

f.data = prefix+newdata+f.data[start+len(newdata):]
f.data = prefix + newdata + f.data[start + len(newdata):]
else:
f.data = prefix + instream.read()

Expand Down

0 comments on commit c112422

Please sign in to comment.