-
Notifications
You must be signed in to change notification settings - Fork 246
/
sftpd.py
2005 lines (1647 loc) · 88.9 KB
/
sftpd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import six
import heapq, traceback, array, stat, struct
from types import NoneType
from stat import S_IFREG, S_IFDIR
from time import time, strftime, localtime
from zope.interface import implementer
from twisted.python import components
from twisted.application import service, strports
from twisted.conch.ssh import factory, keys, session
from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
FX_BAD_MESSAGE, FX_FAILURE, FX_OK
from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
FXF_CREAT, FXF_TRUNC, FXF_EXCL
from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
from twisted.conch.avatar import ConchUser
from twisted.conch.openssh_compat import primes
from twisted.cred import portal
from twisted.internet.error import ProcessDone, ProcessTerminated
from twisted.python.failure import Failure
from twisted.internet.interfaces import ITransport
from twisted.internet import defer
from twisted.internet.interfaces import IConsumer
from foolscap.api import eventually
from allmydata.util import deferredutil
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.consumer import download_to_data
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
NoSuchChildError, ChildOfWrongTypeError
from allmydata.mutable.common import NotWriteableError
from allmydata.mutable.publish import MutableFileHandle
from allmydata.immutable.upload import FileHandle
from allmydata.dirnode import update_metadata
from allmydata.util.fileutil import EncryptedTemporaryFile
noisy = True
from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
msg as logmsg, PrefixingLogMixin
if six.PY3:
long = int
def eventually_callback(d):
return lambda res: eventually(d.callback, res)
def eventually_errback(d):
return lambda err: eventually(d.errback, err)
def _utf8(x):
if isinstance(x, unicode):
return x.encode('utf-8')
if isinstance(x, str):
return x
return repr(x)
def _to_sftp_time(t):
"""SFTP times are unsigned 32-bit integers representing UTC seconds
(ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
A Tahoe time is the corresponding float."""
return long(t) & long(0xFFFFFFFF)
def _convert_error(res, request):
"""If res is not a Failure, return it, otherwise reraise the appropriate
SFTPError."""
if not isinstance(res, Failure):
logged_res = res
if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
return res
err = res
logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
try:
if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
except Exception: # pragma: no cover
pass
# The message argument to SFTPError must not reveal information that
# might compromise anonymity, if we are running over an anonymous network.
if err.check(SFTPError):
# original raiser of SFTPError has responsibility to ensure anonymity
raise err
if err.check(NoSuchChildError):
childname = _utf8(err.value.args[0])
raise SFTPError(FX_NO_SUCH_FILE, childname)
if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
msg = _utf8(err.value.args[0])
raise SFTPError(FX_PERMISSION_DENIED, msg)
if err.check(ExistingChildError):
# Versions of SFTP after v3 (which is what twisted.conch implements)
# define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
# However v3 doesn't; instead, other servers such as sshd return
# FX_FAILURE. The gvfs SFTP backend, for example, depends on this
# to translate the error to the equivalent of POSIX EEXIST, which is
# necessary for some picky programs (such as gedit).
msg = _utf8(err.value.args[0])
raise SFTPError(FX_FAILURE, msg)
if err.check(NotImplementedError):
raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
if err.check(EOFError):
raise SFTPError(FX_EOF, "end of file reached")
if err.check(defer.FirstError):
_convert_error(err.value.subFailure, request)
# We assume that the error message is not anonymity-sensitive.
raise SFTPError(FX_FAILURE, _utf8(err.value))
def _repr_flags(flags):
return "|".join([f for f in
[(flags & FXF_READ) and "FXF_READ" or None,
(flags & FXF_WRITE) and "FXF_WRITE" or None,
(flags & FXF_APPEND) and "FXF_APPEND" or None,
(flags & FXF_CREAT) and "FXF_CREAT" or None,
(flags & FXF_TRUNC) and "FXF_TRUNC" or None,
(flags & FXF_EXCL) and "FXF_EXCL" or None,
]
if f])
def _lsLine(name, attrs):
st_uid = "tahoe"
st_gid = "tahoe"
st_mtime = attrs.get("mtime", 0)
st_mode = attrs["permissions"]
# Some clients won't tolerate '?' in the size field (#1337).
st_size = attrs.get("size", 0)
# We don't know how many links there really are to this object.
st_nlink = 1
# Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
# We previously could not call the version in Twisted because we needed the change
# <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
# Since we now depend on Twisted v10.1, consider calling Twisted's version.
mode = st_mode
perms = array.array('c', '-'*10)
ft = stat.S_IFMT(mode)
if stat.S_ISDIR(ft): perms[0] = 'd'
elif stat.S_ISREG(ft): perms[0] = '-'
else: perms[0] = '?'
# user
if mode&stat.S_IRUSR: perms[1] = 'r'
if mode&stat.S_IWUSR: perms[2] = 'w'
if mode&stat.S_IXUSR: perms[3] = 'x'
# group
if mode&stat.S_IRGRP: perms[4] = 'r'
if mode&stat.S_IWGRP: perms[5] = 'w'
if mode&stat.S_IXGRP: perms[6] = 'x'
# other
if mode&stat.S_IROTH: perms[7] = 'r'
if mode&stat.S_IWOTH: perms[8] = 'w'
if mode&stat.S_IXOTH: perms[9] = 'x'
# suid/sgid never set
l = perms.tostring()
l += str(st_nlink).rjust(5) + ' '
un = str(st_uid)
l += un.ljust(9)
gr = str(st_gid)
l += gr.ljust(9)
sz = str(st_size)
l += sz.rjust(8)
l += ' '
day = 60 * 60 * 24
sixmo = day * 7 * 26
now = time()
if st_mtime + sixmo < now or st_mtime > now + day:
# mtime is more than 6 months ago, or more than one day in the future
l += strftime("%b %d %Y ", localtime(st_mtime))
else:
l += strftime("%b %d %H:%M ", localtime(st_mtime))
l += name
return l
def _no_write(parent_readonly, child, metadata=None):
"""Whether child should be listed as having read-only permissions in parent."""
if child.is_unknown():
return True
elif child.is_mutable():
return child.is_readonly()
elif parent_readonly or IDirectoryNode.providedBy(child):
return True
else:
return metadata is not None and metadata.get('no-write', False)
def _populate_attrs(childnode, metadata, size=None):
attrs = {}
# The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
# bits, otherwise the client may refuse to open a directory.
# Also, sshfs run as a non-root user requires files and directories
# to be world-readable/writeable.
# It is important that we never set the executable bits on files.
#
# Directories and unknown nodes have no size, and SFTP doesn't
# require us to make one up.
#
# childnode might be None, meaning that the file doesn't exist yet,
# but we're going to write it later.
if childnode and childnode.is_unknown():
perms = 0
elif childnode and IDirectoryNode.providedBy(childnode):
perms = S_IFDIR | 0o777
else:
# For files, omit the size if we don't immediately know it.
if childnode and size is None:
size = childnode.get_size()
if size is not None:
_assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
attrs['size'] = size
perms = S_IFREG | 0o666
if metadata:
if metadata.get('no-write', False):
perms &= S_IFDIR | S_IFREG | 0o555 # clear 'w' bits
# See webapi.txt for what these times mean.
# We would prefer to omit atime, but SFTP version 3 can only
# accept mtime if atime is also set.
if 'linkmotime' in metadata.get('tahoe', {}):
attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
elif 'mtime' in metadata:
attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
if 'linkcrtime' in metadata.get('tahoe', {}):
attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
attrs['permissions'] = perms
# twisted.conch.ssh.filetransfer only implements SFTP version 3,
# which doesn't include SSH_FILEXFER_ATTR_FLAGS.
return attrs
def _attrs_to_metadata(attrs):
metadata = {}
for key in attrs:
if key == "mtime" or key == "ctime" or key == "createtime":
metadata[key] = long(attrs[key])
elif key.startswith("ext_"):
metadata[key] = str(attrs[key])
perms = attrs.get('permissions', stat.S_IWUSR)
if not (perms & stat.S_IWUSR):
metadata['no-write'] = True
return metadata
def _direntry_for(filenode_or_parent, childname, filenode=None):
precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
if childname is None:
filenode_or_parent = filenode
if filenode_or_parent:
rw_uri = filenode_or_parent.get_write_uri()
if rw_uri and childname:
return rw_uri + "/" + childname.encode('utf-8')
else:
return rw_uri
return None
@implementer(IConsumer)
class OverwriteableFileConsumer(PrefixingLogMixin):
"""I act both as a consumer for the download of the original file contents, and as a
wrapper for a temporary file that records the downloaded data and any overwrites.
I use a priority queue to keep track of which regions of the file have been overwritten
but not yet downloaded, so that the download does not clobber overwritten data.
I use another priority queue to record milestones at which to make callbacks
indicating that a given number of bytes have been downloaded.
The temporary file reflects the contents of the file that I represent, except that:
- regions that have neither been downloaded nor overwritten, if present,
contain garbage.
- the temporary file may be shorter than the represented file (it is never longer).
The latter's current size is stored in self.current_size.
This abstraction is mostly independent of SFTP. Consider moving it, if it is found
useful for other frontends."""
def __init__(self, download_size, tempfile_maker):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
self.download_size = download_size
self.current_size = download_size
self.f = tempfile_maker()
self.downloaded = 0
self.milestones = [] # empty heap of (offset, d)
self.overwrites = [] # empty heap of (start, end)
self.is_closed = False
self.done = defer.Deferred()
self.done_status = None # None -> not complete, Failure -> download failed, str -> download succeeded
self.producer = None
def get_file(self):
return self.f
def get_current_size(self):
return self.current_size
def set_current_size(self, size):
if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
(size, self.current_size, self.downloaded), level=NOISY)
if size < self.current_size or size < self.downloaded:
self.f.truncate(size)
if size > self.current_size:
self.overwrite(self.current_size, "\x00" * (size - self.current_size))
self.current_size = size
# make the invariant self.download_size <= self.current_size be true again
if size < self.download_size:
self.download_size = size
if self.downloaded >= self.download_size:
self.download_done("size changed")
def registerProducer(self, p, streaming):
if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
if self.producer is not None:
raise RuntimeError("producer is already registered")
self.producer = p
if streaming:
# call resumeProducing once to start things off
p.resumeProducing()
else:
def _iterate():
if self.done_status is None:
p.resumeProducing()
eventually(_iterate)
_iterate()
def write(self, data):
if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
if self.is_closed:
return
if self.downloaded >= self.download_size:
return
next_downloaded = self.downloaded + len(data)
if next_downloaded > self.download_size:
data = data[:(self.download_size - self.downloaded)]
while len(self.overwrites) > 0:
(start, end) = self.overwrites[0]
if start >= next_downloaded:
# This and all remaining overwrites are after the data we just downloaded.
break
if start > self.downloaded:
# The data we just downloaded has been partially overwritten.
# Write the prefix of it that precedes the overwritten region.
self.f.seek(self.downloaded)
self.f.write(data[:(start - self.downloaded)])
# This merges consecutive overwrites if possible, which allows us to detect the
# case where the download can be stopped early because the remaining region
# to download has already been fully overwritten.
heapq.heappop(self.overwrites)
while len(self.overwrites) > 0:
(start1, end1) = self.overwrites[0]
if start1 > end:
break
end = end1
heapq.heappop(self.overwrites)
if end >= next_downloaded:
# This overwrite extends past the downloaded data, so there is no
# more data to consider on this call.
heapq.heappush(self.overwrites, (next_downloaded, end))
self._update_downloaded(next_downloaded)
return
elif end >= self.downloaded:
data = data[(end - self.downloaded):]
self._update_downloaded(end)
self.f.seek(self.downloaded)
self.f.write(data)
self._update_downloaded(next_downloaded)
def _update_downloaded(self, new_downloaded):
self.downloaded = new_downloaded
milestone = new_downloaded
if len(self.overwrites) > 0:
(start, end) = self.overwrites[0]
if start <= new_downloaded and end > milestone:
milestone = end
while len(self.milestones) > 0:
(next, d) = self.milestones[0]
if next > milestone:
return
if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
heapq.heappop(self.milestones)
eventually_callback(d)("reached")
if milestone >= self.download_size:
self.download_done("reached download size")
def overwrite(self, offset, data):
if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
if self.is_closed:
self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
if offset > self.current_size:
# Normally writing at an offset beyond the current end-of-file
# would leave a hole that appears filled with zeroes. However, an
# EncryptedTemporaryFile doesn't behave like that (if there is a
# hole in the file on disk, the zeroes that are read back will be
# XORed with the keystream). So we must explicitly write zeroes in
# the gap between the current EOF and the offset.
self.f.seek(self.current_size)
self.f.write("\x00" * (offset - self.current_size))
start = self.current_size
else:
self.f.seek(offset)
start = offset
self.f.write(data)
end = offset + len(data)
self.current_size = max(self.current_size, end)
if end > self.downloaded:
heapq.heappush(self.overwrites, (start, end))
def read(self, offset, length):
"""When the data has been read, callback the Deferred that we return with this data.
Otherwise errback the Deferred that we return.
The caller must perform no more overwrites until the Deferred has fired."""
if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
if self.is_closed:
self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
# Note that the overwrite method is synchronous. When a write request is processed
# (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
# be called and will update self.current_size if necessary before returning. Therefore,
# self.current_size will be up-to-date for a subsequent call to this read method, and
# so it is correct to do the check for a read past the end-of-file here.
if offset >= self.current_size:
def _eof(): raise EOFError("read past end of file")
return defer.execute(_eof)
if offset + length > self.current_size:
length = self.current_size - offset
if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
needed = min(offset + length, self.download_size)
# If we fail to reach the needed number of bytes, the read request will fail.
d = self.when_reached_or_failed(needed)
def _reached_in_read(res):
# It is not necessarily the case that self.downloaded >= needed, because
# the file might have been truncated (thus truncating the download) and
# then extended.
_assert(self.current_size >= offset + length,
current_size=self.current_size, offset=offset, length=length)
if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
self.f.seek(offset)
return self.f.read(length)
d.addCallback(_reached_in_read)
return d
def when_reached_or_failed(self, index):
if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
def _reached(res):
if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
return res
if self.done_status is not None:
return defer.execute(_reached, self.done_status)
if index <= self.downloaded: # already reached successfully
if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
return defer.succeed("already reached successfully")
d = defer.Deferred()
d.addCallback(_reached)
heapq.heappush(self.milestones, (index, d))
return d
def when_done(self):
d = defer.Deferred()
self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
return d
def download_done(self, res):
_assert(isinstance(res, (str, Failure)), res=res)
# Only the first call to download_done counts, but we log subsequent calls
# (multiple calls are normal).
if self.done_status is not None:
self.log("IGNORING extra call to download_done with result %r; previous result was %r"
% (res, self.done_status), level=OPERATIONAL)
return
self.log("DONE with result %r" % (res,), level=OPERATIONAL)
# We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
# in case when_done() is never called. Instead we stash the failure in self.done_status,
# from where the callback added in when_done() can retrieve it.
self.done_status = res
eventually_callback(self.done)(None)
while len(self.milestones) > 0:
(next, d) = self.milestones[0]
if noisy: self.log("MILESTONE FINISH %r %r %r" % (next, d, res), level=NOISY)
heapq.heappop(self.milestones)
# The callback means that the milestone has been reached if
# it is ever going to be. Note that the file may have been
# truncated to before the milestone.
eventually_callback(d)(res)
def close(self):
if not self.is_closed:
self.is_closed = True
try:
self.f.close()
except Exception as e:
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
self.download_done("closed")
return self.done_status
def unregisterProducer(self):
# This will happen just before our client calls download_done, which will tell
# us the outcome of the download; we don't know the outcome at this point.
self.producer = None
self.log("producer unregistered", level=NOISY)
SIZE_THRESHOLD = 1000
@implementer(ISFTPFile)
class ShortReadOnlySFTPFile(PrefixingLogMixin):
"""I represent a file handle to a particular file on an SFTP connection.
I am used only for short immutable files opened in read-only mode.
When I am created, the file contents start to be downloaded to memory.
self.async is used to delay read requests until the download has finished."""
def __init__(self, userpath, filenode, metadata):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
userpath=userpath, filenode=filenode)
self.filenode = filenode
self.metadata = metadata
self.async = download_to_data(filenode)
self.closed = False
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
return defer.execute(_closed)
d = defer.Deferred()
def _read(data):
if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
# "In response to this request, the server will read as many bytes as it
# can from the file (up to 'len'), and return them in a SSH_FXP_DATA
# message. If an error occurs or EOF is encountered before reading any
# data, the server will respond with SSH_FXP_STATUS. For normal disk
# files, it is guaranteed that this will read the specified number of
# bytes, or up to end of file."
#
# i.e. we respond with an EOF error iff offset is already at EOF.
if offset >= len(data):
eventually_errback(d)(Failure(SFTPError(FX_EOF, "read at or past end of file")))
else:
eventually_callback(d)(data[offset:offset+length]) # truncated if offset+length > len(data)
return data
self.async.addCallbacks(_read, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
def writeChunk(self, offset, data):
self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
return defer.execute(_denied)
def close(self):
self.log(".close()", level=OPERATIONAL)
self.closed = True
return defer.succeed(None)
def getAttrs(self):
request = ".getAttrs()"
self.log(request, level=OPERATIONAL)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
return defer.execute(_closed)
d = defer.execute(_populate_attrs, self.filenode, self.metadata)
d.addBoth(_convert_error, request)
return d
def setAttrs(self, attrs):
self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
return defer.execute(_denied)
@implementer(ISFTPFile)
class GeneralSFTPFile(PrefixingLogMixin):
"""I represent a file handle to a particular file on an SFTP connection.
I wrap an instance of OverwriteableFileConsumer, which is responsible for
storing the file contents. In order to allow write requests to be satisfied
immediately, there is effectively a FIFO queue between requests made to this
file handle, and requests to my OverwriteableFileConsumer. This queue is
implemented by the callback chain of self.async.
When first constructed, I am in an 'unopened' state that causes most
operations to be delayed until 'open' is called."""
def __init__(self, userpath, flags, close_notify, convergence):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
(userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
precondition(isinstance(userpath, str), userpath=userpath)
self.userpath = userpath
self.flags = flags
self.close_notify = close_notify
self.convergence = convergence
self.async = defer.Deferred()
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
self.closed = False
self.abandoned = False
self.parent = None
self.childname = None
self.filenode = None
self.metadata = None
# self.consumer should only be relied on in callbacks for self.async, since it might
# not be set before then.
self.consumer = None
def open(self, parent=None, childname=None, filenode=None, metadata=None):
self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
(parent, childname, filenode, metadata), level=OPERATIONAL)
precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
precondition(not self.closed, sftpfile=self)
# If the file has been renamed, the new (parent, childname) takes precedence.
if self.parent is None:
self.parent = parent
if self.childname is None:
self.childname = childname
self.filenode = filenode
self.metadata = metadata
tempfile_maker = EncryptedTemporaryFile
if (self.flags & FXF_TRUNC) or not filenode:
# We're either truncating or creating the file, so we don't need the old contents.
self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
self.consumer.download_done("download not needed")
else:
self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
def _read(version):
if noisy: self.log("_read", level=NOISY)
download_size = version.get_size()
_assert(download_size is not None)
self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
d = version.read(self.consumer, 0, None)
def _finished(res):
if not isinstance(res, Failure):
res = "download finished"
self.consumer.download_done(res)
d.addBoth(_finished)
# It is correct to drop d here.
self.async.addCallback(_read)
eventually_callback(self.async)(None)
if noisy: self.log("open done", level=NOISY)
return self
def get_userpath(self):
return self.userpath
def get_direntry(self):
return _direntry_for(self.parent, self.childname)
def rename(self, new_userpath, new_parent, new_childname):
self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
new_userpath=new_userpath, new_childname=new_childname)
self.userpath = new_userpath
self.parent = new_parent
self.childname = new_childname
def abandon(self):
self.log(".abandon()", level=OPERATIONAL)
self.abandoned = True
def sync(self, ign=None):
# The ign argument allows some_file.sync to be used as a callback.
self.log(".sync()", level=OPERATIONAL)
d = defer.Deferred()
self.async.addBoth(eventually_callback(d))
def _done(res):
if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
return res
d.addBoth(_done)
return d
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
if not (self.flags & FXF_READ):
def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
return defer.execute(_denied)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
return defer.execute(_closed)
d = defer.Deferred()
def _read(ign):
if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
d2 = self.consumer.read(offset, length)
d2.addBoth(eventually_callback(d))
# It is correct to drop d2 here.
return None
self.async.addCallbacks(_read, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
def writeChunk(self, offset, data):
self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
if not (self.flags & FXF_WRITE):
def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
return defer.execute(_denied)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
return defer.execute(_closed)
self.has_changed = True
# Note that we return without waiting for the write to occur. Reads and
# close wait for prior writes, and will fail if any prior operation failed.
# This is ok because SFTP makes no guarantee that the write completes
# before the request does. In fact it explicitly allows write errors to be
# delayed until close:
# "One should note that on some server platforms even a close can fail.
# This can happen e.g. if the server operating system caches writes,
# and an error occurs while flushing cached writes during the close."
def _write(ign):
if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
(offset, len(data), self.consumer.get_current_size()), level=NOISY)
# FXF_APPEND means that we should always write at the current end of file.
write_offset = offset
if self.flags & FXF_APPEND:
write_offset = self.consumer.get_current_size()
self.consumer.overwrite(write_offset, data)
if noisy: self.log("overwrite done", level=NOISY)
return None
self.async.addCallback(_write)
# don't addErrback to self.async, just allow subsequent async ops to fail.
return defer.succeed(None)
def _do_close(self, res, d=None):
if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
status = None
if self.consumer:
status = self.consumer.close()
# We must close_notify before re-firing self.async.
if self.close_notify:
self.close_notify(self.userpath, self.parent, self.childname, self)
if not isinstance(res, Failure) and isinstance(status, Failure):
res = status
if d:
eventually_callback(d)(res)
elif isinstance(res, Failure):
self.log("suppressing %r" % (res,), level=OPERATIONAL)
def close(self):
request = ".close()"
self.log(request, level=OPERATIONAL)
if self.closed:
return defer.succeed(None)
# This means that close has been called, not that the close has succeeded.
self.closed = True
if not (self.flags & (FXF_WRITE | FXF_CREAT)):
# We never fail a close of a handle opened only for reading, even if the file
# failed to download. (We could not do so deterministically, because it would
# depend on whether we reached the point of failure before abandoning the
# download.) Any reads that depended on file content that could not be downloaded
# will have failed. It is important that we don't close the consumer until
# previous read operations have completed.
self.async.addBoth(self._do_close)
return defer.succeed(None)
# We must capture the abandoned, parent, and childname variables synchronously
# at the close call. This is needed by the correctness arguments in the comments
# for _abandon_any_heisenfiles and _rename_heisenfiles.
# Note that the file must have been opened before it can be closed.
abandoned = self.abandoned
parent = self.parent
childname = self.childname
# has_changed is set when writeChunk is called, not when the write occurs, so
# it is correct to optimize out the commit if it is False at the close call.
has_changed = self.has_changed
def _commit(ign):
d2 = self.consumer.when_done()
if self.filenode and self.filenode.is_mutable():
self.log("update mutable file %r childname=%r metadata=%r"
% (self.filenode, childname, self.metadata), level=OPERATIONAL)
if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
_assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
else:
def _add_file(ign):
self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
u = FileHandle(self.consumer.get_file(), self.convergence)
return parent.add_file(childname, u, metadata=self.metadata)
d2.addCallback(_add_file)
return d2
# If the file has been abandoned, we don't want the close operation to get "stuck",
# even if self.async fails to re-fire. Completing the close independently of self.async
# in that case should ensure that dropping an ssh connection is sufficient to abandon
# any heisenfiles that were not explicitly closed in that connection.
if abandoned or not has_changed:
d = defer.succeed(None)
self.async.addBoth(self._do_close)
else:
d = defer.Deferred()
self.async.addCallback(_commit)
self.async.addBoth(self._do_close, d)
d.addBoth(_convert_error, request)
return d
def getAttrs(self):
request = ".getAttrs()"
self.log(request, level=OPERATIONAL)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
return defer.execute(_closed)
# Optimization for read-only handles, when we already know the metadata.
if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
return defer.succeed(_populate_attrs(self.filenode, self.metadata))
d = defer.Deferred()
def _get(ign):
if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
# self.filenode might be None, but that's ok.
attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
eventually_callback(d)(attrs)
return None
self.async.addCallbacks(_get, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
def setAttrs(self, attrs, only_if_at=None):
request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
self.log(request, level=OPERATIONAL)
if not (self.flags & FXF_WRITE):
def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
return defer.execute(_denied)
if self.closed:
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
return defer.execute(_closed)
size = attrs.get("size", None)
if size is not None and (not isinstance(size, (int, long)) or size < 0):
def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
return defer.execute(_bad)
d = defer.Deferred()
def _set(ign):
if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
if only_if_at and only_if_at != current_direntry:
if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
(current_direntry, request), level=NOISY)
return None
now = time()
self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
if size is not None:
# TODO: should we refuse to truncate a file opened with FXF_APPEND?
# <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
self.consumer.set_current_size(size)
eventually_callback(d)(None)
return None
self.async.addCallbacks(_set, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
class StoppableList:
def __init__(self, items):
self.items = items
def __iter__(self):
for i in self.items:
yield i
def close(self):
pass
class Reason:
def __init__(self, value):
self.value = value
# A "heisenfile" is a file that has been opened with write flags
# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
# 'all_heisenfiles' maps from a direntry string to a list of
# GeneralSFTPFile.
#
# A direntry string is parent_write_uri + "/" + childname_utf8 for
# an immutable file, or file_write_uri for a mutable file.
# Updates to this dict are single-threaded.
all_heisenfiles = {}
def _reload():
global all_heisenfiles
all_heisenfiles = {}
@implementer(ISFTPServer)
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
def __init__(self, client, rootnode, username):
ConchUser.__init__(self)
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
self.channelLookup["session"] = session.SSHSession
self.subsystemLookup["sftp"] = FileTransferServer
self._client = client
self._root = rootnode
self._username = username
self._convergence = client.convergence
# maps from UTF-8 paths for this user, to files written and still open
self._heisenfiles = {}