-
-
Notifications
You must be signed in to change notification settings - Fork 32.6k
bpo-2628: support BLOCK mode for retrbinary #29337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
# Modified by Siebren to support docstrings and PASV. | ||
# Modified by Phil Schwartz to add storbinary and storlines callbacks. | ||
# Modified by Giampaolo Rodola' to add TLS support. | ||
# Modified by Jonathan Bell to support block transmission mode in retrbinary. | ||
# | ||
|
||
import sys | ||
|
@@ -105,6 +106,8 @@ class FTP: | |
passiveserver = True | ||
# Disables https://bugs.python.org/issue43285 security if set to True. | ||
trust_server_pasv_ipv4_address = False | ||
transmissionmode = 'S' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
dataconn = None | ||
|
||
def __init__(self, host='', user='', passwd='', acct='', | ||
timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None, *, | ||
|
@@ -349,7 +352,14 @@ def ntransfercmd(self, cmd, rest=None): | |
given marker. | ||
""" | ||
size = None | ||
if self.passiveserver: | ||
if self.dataconn is not None: | ||
if rest is not None: | ||
self.sendcmd("REST %s" % rest) | ||
resp = self.sendcmd(cmd) | ||
if resp[0] != '1': | ||
raise error_reply(resp) | ||
conn = self.dataconn | ||
elif self.passiveserver: | ||
host, port = self.makepasv() | ||
conn = socket.create_connection((host, port), self.timeout, | ||
source_address=self.source_address) | ||
|
@@ -367,6 +377,8 @@ def ntransfercmd(self, cmd, rest=None): | |
resp = self.getresp() | ||
if resp[0] != '1': | ||
raise error_reply(resp) | ||
if self.transmissionmode == 'B': | ||
self.dataconn = conn | ||
except: | ||
conn.close() | ||
raise | ||
|
@@ -381,6 +393,8 @@ def ntransfercmd(self, cmd, rest=None): | |
if resp[0] != '1': | ||
raise error_reply(resp) | ||
conn, sockaddr = sock.accept() | ||
if self.transmissionmode == 'B': | ||
self.dataconn = conn | ||
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT: | ||
conn.settimeout(self.timeout) | ||
if resp[:3] == '150': | ||
|
@@ -432,16 +446,95 @@ def retrbinary(self, cmd, callback, blocksize=8192, rest=None): | |
Returns: | ||
The response code. | ||
""" | ||
import struct | ||
|
||
self.voidcmd('TYPE I') | ||
with self.transfercmd(cmd, rest) as conn: | ||
while 1: | ||
data = conn.recv(blocksize) | ||
if not data: | ||
break | ||
callback(data) | ||
# shutdown ssl layer | ||
if _SSLSocket is not None and isinstance(conn, _SSLSocket): | ||
conn.unwrap() | ||
self.voidcmd('MODE %s' % self.transmissionmode) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since I expect few servers support BLOCK mode, I would explicitly send "MODE" command only if |
||
|
||
if self.transmissionmode == 'S': | ||
with self.transfercmd(cmd, rest) as conn: | ||
while 1: | ||
data = conn.recv(blocksize) | ||
if not data: | ||
break | ||
callback(data) | ||
# shutdown ssl layer | ||
if _SSLSocket is not None and isinstance(conn, _SSLSocket): | ||
conn.unwrap() | ||
elif self.transmissionmode == 'B': | ||
with self.transfercmd(cmd, rest) as conn: | ||
while 1: | ||
# Receive one byte at a time, not all 3 at once -- seriously | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? Is it because returned data may be < 3 bytes? |
||
header = bytes() | ||
for i in range(0, 3): | ||
header += conn.recv(1) | ||
|
||
(descriptor, blocklength) = struct.unpack('!BH', header) | ||
if self.debugging: | ||
print("*header* %d\t%d" % (descriptor, blocklength)) | ||
|
||
if (0 <= descriptor <= 240) and descriptor % 16 == 0: | ||
pass | ||
else: | ||
# Abort the transfer. Expect 426 response code. | ||
self.abort() | ||
# Exception so close the data connection. | ||
self.close_dataconn() | ||
# Catch the 226 response code. | ||
self.voidresp() | ||
raise error_proto("Unexpected header descriptor. Data block is invalid.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
data = bytes() | ||
dtl = [] | ||
sread = 0 | ||
while 1: | ||
if blocklength == 0: | ||
break | ||
try: | ||
buff = conn.recv(blocklength-sread, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can omit passing |
||
except socket.error as se: | ||
if self.debugging: | ||
print("*got socket error: %s*" % se) | ||
raise se | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this whole |
||
if not buff: | ||
if self.debugging: | ||
print("*got no data on recv* final size: %d" % sread) | ||
break | ||
if len(buff) > blocklength-sread: | ||
if self.debugging: | ||
print("*got more data than desired!* %d vs %d" % (len(buff), blocklength-sread)) | ||
sread = sread + len(buff) | ||
data = data + buff | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding (+) chunks of buff = bytearray()
curr_length = blocklength
while len(buff) < blocklength:
chunk = conn.recv(curr_length)
buff.extend(chunk)
curr_length -= len(chunk)
data = bytes(buff) In order to avoid making this method too complex, perhaps it makes sense to encapsulate this logic in a utility function (again, not tested): def _read_until(sock, length):
if length <= 0:
return b""
buff = bytearray()
curr_length = length
while len(buff) < length:
chunk = sock.recv(curr_length)
buff.extend(chunk)
curr_length -= len(chunk)
return bytes(buff) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if the server sends us a big number (say size = 1G), we will hog RAM. I wonder if it would make sense to implement an additional, internal buffer, that calls |
||
dtl.append(buff) | ||
if sread >= blocklength: | ||
break | ||
|
||
if descriptor & 16: | ||
# Data block is a restart. Not implemented. | ||
self.abort() | ||
if self.debugging: | ||
print('*restart marker*') | ||
raise NotImplementedError("Remote server sent a restart marker. Operation unsupported.") | ||
else: | ||
callback(data) | ||
|
||
if descriptor & 128: | ||
# End of "record" as defined by file type. | ||
if self.debugging: | ||
print('*end-of-record*') | ||
if descriptor & 64: | ||
# End of file. | ||
if self.debugging: | ||
print('*EOF*') | ||
break | ||
if descriptor & 32: | ||
# Data is suspect (i.e. "magnetic tape read errors") | ||
if self.debugging: | ||
print('*suspect data*') | ||
else: | ||
self.close_dataconn() | ||
raise NotImplementedError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest |
||
|
||
return self.voidresp() | ||
|
||
def retrlines(self, cmd, callback = None): | ||
|
@@ -495,6 +588,8 @@ def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None): | |
The response code. | ||
""" | ||
self.voidcmd('TYPE I') | ||
self.voidcmd('MODE S') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since I expect few servers to implement BLOCK mode, I would avoid sending MODE command unless it's really necessary. I would change this to:
|
||
|
||
with self.transfercmd(cmd, rest) as conn: | ||
while 1: | ||
buf = fp.read(blocksize) | ||
|
@@ -673,6 +768,27 @@ def close(self): | |
if sock is not None: | ||
sock.close() | ||
|
||
def set_transmissionmode(self, mode): | ||
"""Set the transmission mode. | ||
|
||
Args: | ||
mode: Mode in which the FTP server should transmit files. | ||
S indicates Stream mode, the default mode. | ||
B indicates Block mode, in which data is sent as a series | ||
of data blocks, each preceded by a header specifying | ||
the block length and a descriptor. | ||
C indicates Compressed mode. Not implemented by this library. | ||
|
||
Currently, the mode set is applied only when a client calls retrbinary. | ||
""" | ||
self.transmissionmode = mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It probably makes sense to raise There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, does the RFC say anything about what should happen if you switch back to STREAM mode from BLOCK mode (knowing RFC-959 myself, I imagine not)? Should the |
||
|
||
def close_dataconn(self): | ||
"""Close the persistent data connection.""" | ||
if self.dataconn: | ||
self.dataconn.close() | ||
self.dataconn = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should call this also from Also, "data connection" is a very well established concept in FTP, but 99% of the times applies to STREAMing mode. I would be more explicit and call this method |
||
|
||
try: | ||
import ssl | ||
except ImportError: | ||
|
@@ -805,10 +921,10 @@ def abort(self): | |
raise error_proto(resp) | ||
return resp | ||
|
||
|
||
__all__.append('FTP_TLS') | ||
all_errors = (Error, OSError, EOFError, ssl.SSLError) | ||
|
||
|
||
_150_re = None | ||
|
||
def parse150(resp): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
import io | ||
import errno | ||
import os | ||
import random | ||
import struct | ||
import threading | ||
import time | ||
import unittest | ||
|
@@ -56,6 +58,37 @@ | |
"type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n") | ||
|
||
|
||
def data_to_blocks(source_data, block_size=2048): | ||
block_data = bytes() | ||
position = 0 | ||
descriptor_eof = 64 | ||
descriptor_none = 0 | ||
|
||
while 1: | ||
chunk_size = block_size + random.randint(0, 32) | ||
remainder = len(source_data[position:]) | ||
|
||
if remainder > chunk_size: | ||
descriptor = descriptor_none | ||
read_size = chunk_size | ||
else: | ||
descriptor = descriptor_eof | ||
read_size = remainder | ||
|
||
header = struct.pack("!BH", descriptor, read_size) | ||
read_data = source_data[position:position + read_size] | ||
block_data += header + bytes(read_data) | ||
position += read_size | ||
|
||
if descriptor == descriptor_eof: | ||
break | ||
|
||
return block_data | ||
|
||
|
||
BLOCK_RETR_DATA = data_to_blocks(bytes(RETR_DATA.encode("utf-8"))) | ||
|
||
|
||
class DummyDTPHandler(asynchat.async_chat): | ||
dtp_conn_closed = False | ||
|
||
|
@@ -84,7 +117,11 @@ def push(self, what): | |
self.baseclass.next_data = None | ||
if not what: | ||
return self.close_when_done() | ||
super(DummyDTPHandler, self).push(what.encode(self.encoding)) | ||
|
||
if type(what) != bytes: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggested change: |
||
super(DummyDTPHandler, self).push(what.encode(self.encoding)) | ||
else: | ||
super(DummyDTPHandler, self).push(what) | ||
|
||
def handle_error(self): | ||
raise Exception | ||
|
@@ -107,12 +144,14 @@ def __init__(self, conn, encoding=DEFAULT_ENCODING): | |
self.next_data = None | ||
self.rest = None | ||
self.next_retr_data = RETR_DATA | ||
self.next_block_retr_data = BLOCK_RETR_DATA | ||
self.push('220 welcome') | ||
self.encoding = encoding | ||
# We use this as the string IPv4 address to direct the client | ||
# to in response to a PASV command. To test security behavior. | ||
# https://bugs.python.org/issue43285/. | ||
self.fake_pasv_server_ip = '252.253.254.255' | ||
self.mode = "S" | ||
|
||
def collect_incoming_data(self, data): | ||
self.in_buffer.append(data) | ||
|
@@ -239,7 +278,13 @@ def cmd_retr(self, arg): | |
offset = int(self.rest) | ||
else: | ||
offset = 0 | ||
self.dtp.push(self.next_retr_data[offset:]) | ||
|
||
push_data = { | ||
"S": self.next_retr_data, | ||
"B": self.next_block_retr_data, | ||
}.get(self.mode, self.next_retr_data) | ||
|
||
self.dtp.push(push_data[offset:]) | ||
self.dtp.close_when_done() | ||
self.rest = None | ||
|
||
|
@@ -266,6 +311,10 @@ def cmd_setlongretr(self, arg): | |
self.next_retr_data = 'x' * int(arg) | ||
self.push('125 setlongretr ok') | ||
|
||
def cmd_mode(self, arg): | ||
self.mode = arg | ||
self.push("200 mode %s ok" % arg) | ||
|
||
|
||
class DummyFTPServer(asyncore.dispatcher, threading.Thread): | ||
|
||
|
@@ -599,6 +648,15 @@ def callback(data): | |
self.client.retrbinary('retr', callback, rest=rest) | ||
self.check_data(''.join(received), RETR_DATA[rest:]) | ||
|
||
def test_retrbinary_block(self): | ||
def callback(data): | ||
received.append(data.decode(self.client.encoding)) | ||
received = [] | ||
self.client.set_transmissionmode("B") | ||
self.client.retrbinary('retr', callback) | ||
self.check_data(''.join(received), RETR_DATA) | ||
self.client.set_transmissionmode("S") | ||
|
||
def test_retrlines(self): | ||
received = [] | ||
self.client.retrlines('retr', received.append) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should probably mention that BLOCK is supported for RETR (donwload) only.