Skip to content
This repository has been archived by the owner on May 13, 2020. It is now read-only.

Commit

Permalink
Added an experimental listener option to run each client (server
Browse files Browse the repository at this point in the history
connection) in a separate thread.
  • Loading branch information
Jim Fulton committed Jul 14, 2010
1 parent 2e2faff commit e3d61c4
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 17 deletions.
9 changes: 9 additions & 0 deletions README.txt
Expand Up @@ -19,6 +19,15 @@ To learn more, see http://packages.python.org/zc.ngi/
Changes
*******

====================
2.0.0a2 (2010-07-??)
====================

New Features:

- There's a new experimental zc.ngi.async.Implementation.listener
option to run each client (server connection) in it's own thread.

====================
2.0.0a1 (2010-07-08)
====================
Expand Down
52 changes: 35 additions & 17 deletions src/zc/ngi/async.py
Expand Up @@ -21,6 +21,7 @@
import os
import socket
import sys
import thread
import threading
import time
import warnings
Expand Down Expand Up @@ -57,7 +58,11 @@ def __init__(self, daemon=True, name='zc.ngi.async application created'):
self._callbacks = []
self._start_lock = threading.Lock()

thread_ident = None
def call_from_thread(self, func):
if thread.get_ident() == self.thread_ident:
func()
return
self._callbacks.append(func)
self.notify_select()
self.start_thread()
Expand All @@ -69,8 +74,8 @@ def connect(self, addr, handler):
self.call_from_thread(lambda : _Connector(addr, handler, self))
self.start_thread()

def listener(self, addr, handler):
result = _Listener(addr, handler, self)
def listener(self, addr, handler, thready=False):
result = _Listener(addr, handler, self, thready)
self.start_thread()
return result

Expand All @@ -93,7 +98,7 @@ def udp_listener(self, addr, handler, buffer_size=4096):
return result

_thread = None
def start_thread(self, daemon=True):
def start_thread(self):
with self._start_lock:
if self._thread is None:
self._thread = threading.Thread(
Expand All @@ -111,6 +116,7 @@ def wait(self, timeout=None):
raise zc.ngi.interfaces.Timeout

def loop(self, timeout=None):
self.thread_ident = thread.get_ident()
if timeout is not None:
deadline = time.time() + timeout
else:
Expand Down Expand Up @@ -156,6 +162,7 @@ def loop(self, timeout=None):
if timeout <= 0:
raise zc.ngi.interfaces.Timeout
finally:
del self.thread_ident
del self.notify_select
trigger.close()

Expand Down Expand Up @@ -380,9 +387,9 @@ def __hash__(self):
class _ServerConnection(_Connection):
zc.ngi.interfaces.implements(zc.ngi.interfaces.IServerConnection)

def __init__(self, sock, addr, logger, listener):
def __init__(self, sock, addr, logger, listener, implementation):
self.control = listener
_Connection.__init__(self, sock, addr, logger, listener.implementation)
_Connection.__init__(self, sock, addr, logger, implementation)

def close(self):
_Connection.close(self)
Expand Down Expand Up @@ -503,8 +510,7 @@ def add_channel(self, map=None):

def handle_error(self):
reason = sys.exc_info()[1]
#self.logger.exception('listener error')
traceback.print_exception(*sys.exc_info())
self.logger.exception('listener error')
self.close()
self.implementation.handle_error()

Expand All @@ -513,10 +519,11 @@ class _Listener(BaseListener):

logger = logging.getLogger('zc.ngi.async.server')

def __init__(self, addr, handler, implementation):
def __init__(self, addr, handler, implementation, thready):
self.__handler = handler
self.__close_handler = None
self.__connections = {}
self._thready = thready
self.__connections = set()
BaseListener.__init__(self, implementation)
if isinstance(addr, str):
family = socket.AF_UNIX
Expand Down Expand Up @@ -574,20 +581,31 @@ def handle_accept(self):
if __debug__:
self.logger.debug('incoming connection %r', addr)

connection = _ServerConnection(sock, addr, self.logger, self)
self.__connections[connection] = 1
try:
self.__handler(connection)
except:
self.logger.exception("server handler failed")
self.close()
if self._thready:
impl = Implementation(name="%r client" % (self.address,))
else:
impl = self.implementation
connection = _ServerConnection(sock, addr, self.logger, self, impl)
self.__connections.add(connection)

@impl.call_from_thread
def _():
try:
self.__handler(connection)
except:
self.logger.exception("server handler failed")
self.close()

if impl is not self.implementation:
impl.start_thread()


def connections(self):
return iter(self.__connections)

def closed(self, connection):
if connection in self.__connections:
del self.__connections[connection]
self.__connections.remove(connection)
if not self.__connections and self.__close_handler:
self.__close_handler(self)

Expand Down
74 changes: 74 additions & 0 deletions src/zc/ngi/tests.py
Expand Up @@ -479,6 +479,80 @@ def setHandler_compatibility():
"""

def EXPERIMENTAL_thready_async_servers():
r"""
When creating a listener with a zc.ngi.async.Implementation, you can
pass a thready keyword options to cause each client to get it's own thread.
>>> import functools, threading, zc.ngi.generator
>>> @functools.partial(zc.ngi.async.listener, None, thready=True)
... @zc.ngi.generator.handler
... def listener(conn):
... if 'client' not in threading.current_thread().name:
... print 'oops'
... yield
>>> addr = listener.address
So, now we're listening on listener.address, let's connect to it.
>>> event = threading.Event()
>>> class Connect:
... def __init__(self, name):
... self.name = name
... event.clear()
... zc.ngi.async.connect(addr, self)
... event.wait(1)
... def connected(self, connection):
... globals()[self.name] = connection
... zc.ngi.testing.PrintingHandler(connection)
... event.set()
Initially, we have no client handling threads:
>>> def count_client_threads():
... return len([t for t in threading.enumerate()
... if ("%r client" % (addr, )) in t.name])
>>> count_client_threads()
0
>>> _ = Connect('c1')
>>> _ = Connect('c2')
So now we have 2 connections and we have 2 corresponding threads:
>>> count_client_threads()
2
If we close the connections and wait a bit, the threads will be cleaned up:
>>> c1.close()
>>> c2.close()
>>> time.sleep(.1)
>>> count_client_threads()
0
Let's create another connection
>>> _ = Connect('c1')
>>> count_client_threads()
1
Now, we'll close the listener and the connection threads will be cleaned up.
>>> listener.close()
>>> time.sleep(.5)
-> CLOSE end of input
>>> count_client_threads()
0
>>> zc.ngi.async.wait(1)
"""


if sys.version_info < (2, 6):
del setHandler_compatibility

Expand Down

0 comments on commit e3d61c4

Please sign in to comment.