Skip to content

Commit

Permalink
Merged from trunk:
Browse files Browse the repository at this point in the history
  r27408 | shane | 2004-09-02 01:31:25 -0400 (Thu, 02 Sep 2004) | 12 lines

  DualModeChannel no longer attempts to close except in the main thread.

  Also simplified by removing the experimental SimultaneousModeChannel 
  class.  DualModeChannel is ambitious enough already.  We don't need to 
  expose even more bugs in asyncore.

  The advantage of closing in application threads is that it forces the 
  TCP stack to flush buffers immediately, resulting in a quick response.  
  However, TCP_NODELAY should have the same effect, so in theory, no speed 
  has been lost.


  ------------------------------------------------------------------------
  r27407 | shane | 2004-09-02 01:24:05 -0400 (Thu, 02 Sep 2004) | 5 lines

  Added a method of setting socket options and turned on TCP_NODELAY.

  Zope buffers everything already, so the Nagle algorithm is likely 
  to delay unnecessarily in most forseeable cases.

  ------------------------------------------------------------------------
  r27406 | shane | 2004-09-02 01:20:28 -0400 (Thu, 02 Sep 2004) | 2 lines

  Improved wording in interface docstrings
  • Loading branch information
Jim Fulton committed Sep 2, 2004
1 parent 47fe0bb commit c9d01dc
Show file tree
Hide file tree
Showing 6 changed files with 795 additions and 115 deletions.
16 changes: 15 additions & 1 deletion adjustments.py
Expand Up @@ -15,6 +15,8 @@
$Id$
"""
import socket

from zope.server import maxsockets


Expand All @@ -33,7 +35,11 @@ class Adjustments(object):
recv_bytes = 8192

# send_bytes is the number of bytes to send to socket.send().
send_bytes = 8192
# Multiples of 9000 should avoid partly-filled packets, but don't
# set this larger than the TCP write buffer size. In Linux,
# /proc/sys/net/ipv4/tcp_wmem controls the minimum, default, and
# maximum sizes of TCP write buffers.
send_bytes = 9000

# copy_bytes is the number of bytes to copy from one file to another.
copy_bytes = 65536
Expand Down Expand Up @@ -61,5 +67,13 @@ class Adjustments(object):
# Boolean: turn off to not log premature client disconnects.
log_socket_errors = 1

# The socket options to set on receiving a connection.
# It is a list of (level, optname, value) tuples.
# TCP_NODELAY is probably good for Zope, since Zope buffers
# data itself.
socket_options = [
(socket.SOL_TCP, socket.TCP_NODELAY, 1),
]


default_adj = Adjustments()
122 changes: 16 additions & 106 deletions dualmodechannel.py
Expand Up @@ -68,9 +68,6 @@ def writable(self):
def handle_write(self):
if not self.async_mode:
return
self.inner_handle_write()

def inner_handle_write(self):
if self.outbuf:
try:
self._flush_some()
Expand All @@ -87,9 +84,6 @@ def readable(self):
def handle_read(self):
if not self.async_mode:
return
self.inner_handle_read()

def inner_handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
Expand Down Expand Up @@ -122,16 +116,6 @@ def set_sync(self):
# SYNCHRONOUS METHODS
#

def write(self, data):
if data:
self.outbuf.append(data)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propagate errors to the application on purpose
# (to stop the application if the connection closes).
if not self._flush_some():
break

def flush(self, block=1):
"""Sends pending data.
Expand Down Expand Up @@ -167,6 +151,16 @@ def set_async(self):
# METHODS USED IN BOTH MODES
#

def write(self, data):
if data:
self.outbuf.append(data)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propagate errors to the application on purpose
# (to stop the application if the connection closes).
if not self._flush_some():
break

def pull_trigger(self):
"""Wakes up the main loop.
"""
Expand All @@ -186,96 +180,12 @@ def _flush_some(self):
return 0

def close_when_done(self):
# We might be able close immediately.
# Flush all possible.
while self._flush_some():
pass
if not self.outbuf:
# Quick exit.
self.close()
else:
# Wait until outbuf is flushed.
self.will_close = 1
if not self.async_mode:
self.async_mode = 1
self.pull_trigger()


allocate_lock = None


class SimultaneousModeChannel(DualModeChannel):
"""Layer on top of DualModeChannel that allows communication in
both the main thread and other threads at the same time.
The channel operates in synchronous mode with an asynchronous
helper. The asynchronous callbacks empty the output buffer
and fill the input buffer.
"""

def __init__(self, conn, addr, adj=None):
global allocate_lock
if allocate_lock is None:
from thread import allocate_lock

# writelock protects all accesses to outbuf, since reads and
# writes of buffers in this class need to be serialized.
writelock = allocate_lock()
self._writelock_acquire = writelock.acquire
self._writelock_release = writelock.release
self._writelock_locked = writelock.locked
DualModeChannel.__init__(self, conn, addr, adj)

#
# ASYNCHRONOUS METHODS
#

def writable(self):
return self.will_close or (
self.outbuf and not self._writelock_locked())

def handle_write(self):
if not self._writelock_acquire(0):
# A synchronous method is writing.
return
try:
self.inner_handle_write()
finally:
self._writelock_release()

def readable(self):
return not self.will_close

def handle_read(self):
self.inner_handle_read()

def set_sync(self):
pass

#
# SYNCHRONOUS METHODS
#

def write(self, data):
self._writelock_acquire()
try:
DualModeChannel.write(self, data)
finally:
self._writelock_release()

def flush(self, block=1):
self._writelock_acquire()
try:
DualModeChannel.flush(self, block)
finally:
self._writelock_release()

def set_async(self):
pass

#
# METHODS USED IN BOTH MODES
#

def close_when_done(self):
self.will_close = 1
self.pull_trigger()
if not self.async_mode:
# For safety, don't close the socket until the
# main thread calls handle_write().
self.async_mode = 1
self.pull_trigger()
16 changes: 8 additions & 8 deletions ftp/server.py
Expand Up @@ -29,7 +29,7 @@
from zope.server.interfaces.ftp import IFTPCommandHandler
from zope.server.linereceiver.lineserverchannel import LineServerChannel
from zope.server.serverbase import ServerBase
from zope.server.serverchannelbase import ChannelBaseClass
from zope.server.dualmodechannel import DualModeChannel

status_messages = {
'OPEN_DATA_CONN' : '150 Opening %s mode data connection for file list',
Expand Down Expand Up @@ -722,7 +722,7 @@ def handle_accept (self):
self.close()


class RecvChannel(ChannelBaseClass):
class RecvChannel(DualModeChannel):
""" """

complete_transfer = 0
Expand All @@ -732,7 +732,7 @@ def __init__ (self, control_channel, finish_args):
self.control_channel = control_channel
self.finish_args = finish_args
self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
ChannelBaseClass.__init__(self, None, None, control_channel.adj)
DualModeChannel.__init__(self, None, None, control_channel.adj)
# Note that this channel starts out in async mode.

def writable (self):
Expand Down Expand Up @@ -765,7 +765,7 @@ def close(self, *reply_args):
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
ChannelBaseClass.close(self)
DualModeChannel.close(self)



Expand Down Expand Up @@ -804,7 +804,7 @@ def defer(self):
pass


class XmitChannel(ChannelBaseClass):
class XmitChannel(DualModeChannel):

opened = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
Expand All @@ -813,7 +813,7 @@ def __init__ (self, control_channel, ok_reply_args):
self.control_channel = control_channel
self.ok_reply_args = ok_reply_args
self.set_sync()
ChannelBaseClass.__init__(self, None, None, control_channel.adj)
DualModeChannel.__init__(self, None, None, control_channel.adj)

def _open(self):
"""Signal the client to open the connection."""
Expand All @@ -826,7 +826,7 @@ def write(self, data):
raise IOError, 'Client FTP connection closed'
if not self.opened:
self._open()
ChannelBaseClass.write(self, data)
DualModeChannel.write(self, data)

def readable(self):
return not self.connected
Expand Down Expand Up @@ -864,7 +864,7 @@ def close(self, *reply_args):
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
ChannelBaseClass.close(self)
DualModeChannel.close(self)


class ApplicationXmitStream(object):
Expand Down

0 comments on commit c9d01dc

Please sign in to comment.