/
storage.py
159 lines (132 loc) · 4.92 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""General SFTP storage. Subclass it the way you want!"""
import os
import itertools
from pysftpserver.abstractstorage import SFTPAbstractServerStorage
from pysftpserver.futimes import futimes
from pysftpserver.stat_helpers import stat_to_longname
class SFTPServerStorage(SFTPAbstractServerStorage):
"""Simple storage class. Subclass it and override the methods."""
def __init__(self, home, umask=None):
"""Home sweet home.
Set your home to something comfortable and chdir to it.
You should support umask changing too.
"""
self.home = os.path.realpath(home)
os.chdir(self.home)
if umask:
os.umask(umask)
def verify(self, filename):
"""Verify that requested filename is accessible.
In this simple storage class this is always True
(and thus possibly insecure).
"""
return True
def stat(self, filename, lstat=False, fstat=False, parent=None):
"""stat, lstat and fstat requests.
Return a dictionary of stats.
Filename is an handle in the fstat variant.
If parent is not None, then filename is inside parent,
and a join is needed.
This happens in case of readdir responses:
a filename (not a path) has to be returned,
but the stat call need (obviously) a full path.
"""
if not lstat and fstat:
# filename is an handle
_stat = os.fstat(filename)
elif lstat:
_stat = os.lstat(filename)
else:
try:
_stat = os.stat(
filename if not parent
else os.path.join(parent, filename)
)
except:
# we could have a broken symlink
# but lstat could be false:
# this happens in case of readdir responses
_stat = os.lstat(
filename if not parent
else os.path.join(parent, filename)
)
if fstat:
longname = None # not needed in case of fstat
else:
longname = stat_to_longname( # see stat_helpers.py
_stat, filename
)
return {
b'size': _stat.st_size,
b'uid': _stat.st_uid,
b'gid': _stat.st_gid,
b'perm': _stat.st_mode,
b'atime': _stat.st_atime,
b'mtime': _stat.st_mtime,
b'longname': longname
}
def setstat(self, filename, attrs, fsetstat=False):
"""setstat and fsetstat requests.
Filename is an handle in the fstat variant.
If you're using Python < 3.3,
you could find useful the futimes file / function.
"""
if not fsetstat:
f = os.open(filename, os.O_WRONLY)
chown = os.chown
chmod = os.chmod
else: # filename is a fd
f = filename
chown = os.fchown
chmod = os.fchmod
if b'size' in attrs:
os.ftruncate(f, attrs[b'size'])
if all(k in attrs for k in (b'uid', b'gid')):
chown(filename, attrs[b'uid'], attrs[b'gid'])
if b'perm' in attrs:
chmod(filename, attrs[b'perm'])
if all(k in attrs for k in (b'atime', b'mtime')):
if not fsetstat:
os.utime(filename, (attrs[b'atime'], attrs[b'mtime']))
else:
futimes(filename, (attrs[b'atime'], attrs[b'mtime']))
def opendir(self, filename):
"""Return an iterator over the files in filename."""
return itertools.chain(iter([b'.', b'..']), iter(os.listdir(filename)))
def open(self, filename, flags, mode):
"""Return the file handle."""
return os.open(filename, flags, mode)
def mkdir(self, filename, mode):
"""Create directory with given mode."""
os.mkdir(filename, mode)
def rmdir(self, filename):
"""Remove directory."""
os.rmdir(filename)
def rm(self, filename):
"""Remove file."""
os.remove(filename)
def rename(self, oldpath, newpath):
"""Move/rename file."""
os.rename(oldpath, newpath)
def symlink(self, linkpath, targetpath):
"""Symlink file."""
os.symlink(targetpath, linkpath)
def readlink(self, filename):
"""Readlink of filename."""
return os.readlink(filename)
def write(self, handle, off, chunk):
"""Write chunk at offset of handle."""
os.lseek(handle, off, os.SEEK_SET)
rlen = os.write(handle, chunk)
if rlen == len(chunk):
return True
def read(self, handle, off, size):
"""Read from the handle size, starting from offset off."""
os.lseek(handle, off, os.SEEK_SET)
return os.read(handle, size)
def close(self, handle):
"""Close the file handle."""
try:
handle.close()
except AttributeError:
pass