Skip to content

Commit

Permalink
Merged from trunk:
Browse files Browse the repository at this point in the history
r27460 | shane | 2004-09-06 21:52:16 -0400 (Mon, 06 Sep 2004) | 13 lines

Call close_when_done() rather than close() in FTP data connections.

ftp/server.py: a lot of rearranging was necessary to fix code that 
called close() with arguments.  It's important to send a report at the 
end of FTP data connections, but the old way relied on close() with 
arguments and we sometimes need to call close_when_done() instead of 
close().

dualmodechannel.py: added an assertion that verifies close() is always 
called in asynchronous mode.  See the comment.



------------------------------------------------------------------------
r27459 | shane | 2004-09-06 21:45:52 -0400 (Mon, 06 Sep 2004) | 2 lines

If the file does not exist, it is simply not writable.  No OSError needed.

------------------------------------------------------------------------
r27458 | shane | 2004-09-06 21:32:54 -0400 (Mon, 06 Sep 2004) | 2 lines

close_when_done() when handling errors, rather than close()

------------------------------------------------------------------------
r27457 | shane | 2004-09-06 21:32:04 -0400 (Mon, 06 Sep 2004) | 2 lines

Let Ctrl-C kill the thread
  • Loading branch information
Jim Fulton committed Sep 13, 2004
1 parent 4dc1de1 commit 57dfda2
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 73 deletions.
9 changes: 8 additions & 1 deletion dualmodechannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def readable(self):
return not self.will_close

def handle_read(self):
if not self.async_mode:
if not self.async_mode or self.will_close:
return
try:
data = self.recv(self.adj.recv_bytes)
Expand Down Expand Up @@ -189,3 +189,10 @@ def close_when_done(self):
# main thread calls handle_write().
self.async_mode = 1
self.pull_trigger()

def close(self):
# Always close in asynchronous mode. If the connection is
# closed in a thread, the main loop can end up with a bad file
# descriptor.
assert self.async_mode
asyncore.dispatcher.close(self)
149 changes: 79 additions & 70 deletions ftp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ def _getFileSystem(self):

def cmd_abor(self, args):
'See IFTPCommandHandler'
assert self.async_mode
self.reply('TRANSFER_ABORTED')
if self.client_dc is not None:
self.client_dc.close('TRANSFER_ABORTED')
else:
self.reply('TRANSFER_ABORTED')
self.client_dc.reported = True
self.client_dc.close()

def cmd_appe (self, args):
'See IFTPCommandHandler'
Expand Down Expand Up @@ -208,14 +209,11 @@ def cmd_list(self, args, long=1):
except GetoptError:
self.reply('ERR_ARGS')
return

if len(args) > 1:
self.reply('ERR_ARGS')
return

args = args and args[0] or ''


fs = self._getFileSystem()
path = self._generatePath(args)
if not fs.type(path):
Expand All @@ -237,7 +235,9 @@ def cmd_list(self, args, long=1):
cdc.write(s)
cdc.close_when_done()
except OSError, err:
cdc.close('ERR_NO_LIST', str(err))
self.reply('ERR_NO_LIST', str(err))
cdc.reported = True
cdc.close_when_done()

def getList(self, args, long=0, directory=0):
# we need to scan the command line for arguments to '/bin/ls'...
Expand Down Expand Up @@ -271,7 +271,6 @@ def getList(self, args, long=0, directory=0):
return '\r\n'.join(file_list) + '\r\n'



def cmd_mdtm(self, args):
'See IFTPCommandHandler'
fs = self._getFileSystem()
Expand Down Expand Up @@ -401,9 +400,13 @@ def cmd_retr(self, args):
fs.readfile(path, outstream, start)
cdc.close_when_done()
except OSError, err:
cdc.close('ERR_OPEN_READ', str(err))
self.reply('ERR_OPEN_READ', str(err))
cdc.reported = True
cdc.close_when_done()
except IOError, err:
cdc.close('ERR_IO', str(err))
self.reply('ERR_IO', str(err))
cdc.reported = True
cdc.close_when_done()


def cmd_rest(self, args):
Expand Down Expand Up @@ -489,7 +492,7 @@ def cmd_stor(self, args, write_mode='w'):

def finishedRecv(self, buffer, (path, mode, start)):
"""Called by RecvChannel when the transfer is finished."""
# Always called in a task.
assert not self.async_mode
try:
infile = buffer.getfile()
infile.seek(0)
Expand Down Expand Up @@ -555,18 +558,17 @@ def _generatePath(self, args):
path = posixpath.join(self.cwd, args)
return posixpath.normpath(path)


def newPassiveAcceptor(self):
# ensure that only one of these exists at a time.
assert self.async_mode
if self.passive_acceptor is not None:
self.passive_acceptor.close()
self.passive_acceptor = None
self.passive_acceptor = PassiveAcceptor(self)
return self.passive_acceptor



def connectDataChannel(self, cdc):
"""Attempt to connect the data channel."""
pa = self.passive_acceptor
if pa:
# PASV mode.
Expand All @@ -575,7 +577,6 @@ def connectDataChannel(self, cdc):
conn, addr = pa.ready
cdc.set_socket (conn)
cdc.connected = 1
self.passive_acceptor.close()
self.passive_acceptor = None
# else we're still waiting for a connect to the PASV port.
# FTP Explorer is known to do this.
Expand All @@ -588,15 +589,12 @@ def connectDataChannel(self, cdc):
try:
cdc.connect((ip, port))
except socket.error:
cdc.close('NO_DATA_CONN')


def notifyClientDCClosing(self, *reply_args):
if self.client_dc is not None:
self.client_dc = None
if reply_args:
self.reply(*reply_args)
self.reply('NO_DATA_CONN')
cdc.reported = True
cdc.close_when_done()

def closedData(self):
self.client_dc = None

def close(self):
LineServerChannel.close(self)
Expand Down Expand Up @@ -703,11 +701,9 @@ def __init__ (self, control_channel):
self.addr = self.getsockname()
self.listen(1)


def log (self, *ignore):
pass


def handle_accept (self):
conn, addr = self.accept()
conn.setblocking(0)
Expand All @@ -722,18 +718,48 @@ def handle_accept (self):
self.close()


class RecvChannel(DualModeChannel):
""" """
class FTPDataChannel(DualModeChannel):
"""Base class for FTP data connections"""

def __init__ (self, control_channel):
self.control_channel = control_channel
self.reported = False
DualModeChannel.__init__(self, None, None, control_channel.adj)

def report(self, *reply_args):
"""Reports the result of the data transfer."""
self.reported = True
if self.control_channel is not None:
self.control_channel.reply(*reply_args)

def reportDefault(self):
"""Provide a default report on close."""
pass

def close(self):
"""Notifies the control channel when the data connection closes."""
c = self.control_channel
try:
if c is not None and not self.reported:
self.reportDefault()
finally:
self.control_channel = None
DualModeChannel.close(self)
if c is not None:
c.closedData()


class RecvChannel(FTPDataChannel):
"""FTP data receive channel"""

complete_transfer = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno

def __init__ (self, control_channel, finish_args):
self.control_channel = control_channel
self.finish_args = finish_args
self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
DualModeChannel.__init__(self, None, None, control_channel.adj)
# Note that this channel starts out in async mode.
FTPDataChannel.__init__(self, control_channel)
# Note that this channel starts in async mode.

def writable (self):
return 0
Expand All @@ -753,20 +779,11 @@ def handle_close (self):
self.close()
c.queue_task(task)

def close(self, *reply_args):
try:
c = self.control_channel
if c is not None:
self.control_channel = None
if not self.complete_transfer and not reply_args:
# Not all data transferred
reply_args = ('TRANSFER_ABORTED',)
c.notifyClientDCClosing(*reply_args)
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
DualModeChannel.close(self)

def reportDefault(self):
if not self.complete_transfer:
self.report('TRANSFER_ABORTED')
# else the transfer completed and FinishedRecvTask will
# provide a complete reply through finishedRecv().


class FinishedRecvTask(object):
Expand Down Expand Up @@ -803,16 +820,16 @@ def defer(self):
pass


class XmitChannel(DualModeChannel):
class XmitChannel(FTPDataChannel):
"""FTP data send channel"""

opened = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno

def __init__ (self, control_channel, ok_reply_args):
self.control_channel = control_channel
self.ok_reply_args = ok_reply_args
self.set_sync()
DualModeChannel.__init__(self, None, None, control_channel.adj)
FTPDataChannel.__init__(self, control_channel)

def _open(self):
"""Signal the client to open the connection."""
Expand All @@ -825,7 +842,7 @@ def write(self, data):
raise IOError, 'Client FTP connection closed'
if not self.opened:
self._open()
DualModeChannel.write(self, data)
FTPDataChannel.write(self, data)

def readable(self):
return not self.connected
Expand All @@ -836,34 +853,26 @@ def handle_read(self):
self.recv(1)
except:
# The connection failed.
self.close('NO_DATA_CONN')
self.report('NO_DATA_CONN')
self.close()

def handle_connect(self):
pass

def handle_comm_error(self):
self.close('TRANSFER_ABORTED')
self.report('TRANSFER_ABORTED')
self.close()

def close(self, *reply_args):
try:
c = self.control_channel
if c is not None:
self.control_channel = None
if not reply_args:
if not len(self.outbuf):
# All data transferred
if not self.opened:
# Zero-length file
self._open()
reply_args = ('TRANS_SUCCESS',)
else:
# Not all data transferred
reply_args = ('TRANSFER_ABORTED',)
c.notifyClientDCClosing(*reply_args)
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
DualModeChannel.close(self)
def reportDefault(self):
if not len(self.outbuf):
# All data transferred
if not self.opened:
# Zero-length file
self._open()
self.report('TRANS_SUCCESS')
else:
# Not all data transferred
self.report('TRANSFER_ABORTED')


class ApplicationXmitStream(object):
Expand Down
5 changes: 4 additions & 1 deletion ftp/tests/demofs.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ def writefile(self, path, instream, start=None, end=None, append=False):
def writable(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getdir(path)
try:
d = self.getdir(path)
except OSError:
return False
if name not in d:
return d.accessable(self.user, write)
f = d[name]
Expand Down
1 change: 1 addition & 0 deletions ftp/tests/test_ftpserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def setUp(self):
self.counter = 0
self.thread_started = Event()
self.thread = Thread(target=self.loop)
self.thread.setDaemon(True)
self.thread.start()
self.thread_started.wait(10.0)
self.assert_(self.thread_started.isSet())
Expand Down
1 change: 1 addition & 0 deletions http/tests/test_httpserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def setUp(self):
self.counter = 0
self.thread_started = Event()
self.thread = Thread(target=self.loop)
self.thread.setDaemon(True)
self.thread.start()
self.thread_started.wait(10.0)
self.assert_(self.thread_started.isSet())
Expand Down
25 changes: 24 additions & 1 deletion linereceiver/lineserverchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
$Id$
"""

from asyncore import compact_traceback
import os
import sys

Expand Down Expand Up @@ -106,6 +107,27 @@ def reply(self, code, args=(), flush=1):
# TODO: Some logging should go on here.


def handle_error_no_close(self):
"""See asyncore.dispatcher.handle_error()"""
nil, t, v, tbinfo = compact_traceback()

# sometimes a user repr method will crash.
try:
self_repr = repr(self)
except:
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)

self.log_info(
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)


def exception(self):
if DEBUG:
import traceback
Expand All @@ -116,4 +138,5 @@ def exception(self):
except:
info = str(t)
self.reply('INTERNAL_ERROR', info)
self.handle_error()
self.handle_error_no_close()
self.close_when_done()

0 comments on commit 57dfda2

Please sign in to comment.