Navigation Menu

Skip to content

Commit

Permalink
Implemented the ThreadPoolServer with a preallocated pool of threads.…
Browse files Browse the repository at this point in the history
… This server performs better than the ThreadedServer as it does not have to allocate a new thread for each request
  • Loading branch information
sponcec3 authored and sponcec3 committed Apr 18, 2011
1 parent 00aea82 commit 8901ae1
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions rpyc/utils/server.py
Expand Up @@ -8,10 +8,12 @@
import threading
import errno
import logging
import Queue
from rpyc.core import SocketStream, Channel, Connection
from rpyc.utils.registry import UDPRegistryClient
from rpyc.utils.authenticators import AuthenticationError
from rpyc.lib import safe_import
from rpyc.core.async import AsyncResultTimeout
signal = safe_import("signal")


Expand Down Expand Up @@ -191,6 +193,58 @@ def _accept_method(self, sock):
t.start()


class ThreadPoolServer(Server):
'''This server is threaded like the ThreadedServer but reuses threads so that
recreation is not necessary for each request. The pool of threads has a fixed
size that can be set with the 'nbThreads' argument. Otherwise, the default is 20'''

def __init__(self, *args, **kwargs):
'''Initializes a ThreadPoolServer. In particular, instantiate the thread pool.'''
# get the number of threads in the pool
nbthreads = 20
if 'nbThreads' in kwargs:
nbthreads = kwargs['nbThreads']
del kwargs['nbThreads']
# init the parent
Server.__init__(self, *args, **kwargs)
# create a queue where requests will be pending until a thread is ready
self._client_queue = Queue.Queue(nbthreads)
# declare the pool as already active
self.active = True
# setup the thread pool
for i in range(nbthreads):
t = threading.Thread(target = self._authenticate_and_serve_clients, args=(self._client_queue,))
t.daemon = True
t.start()

def _authenticate_and_serve_clients(self, queue):
'''Main method run by the threads of the thread pool. It gets work from the
internal queue and calls the _authenticate_and_serve_client method'''
while self.active:
try:
sock = queue.get(True, 1)
self._authenticate_and_serve_client(sock)
except Queue.Empty:
# we've timed out, let's just retry. We only use the timeout so that this
# thread can stop even if there is nothing in the queue
pass
except Exception, e:
# "Caught exception in Worker thread" message
self.logger.info("failed to serve client, caught exception : %s", str(e))
# wait a bit so that we do not loop too fast in case of error
time.sleep(.2)

def _accept_method(self, sock):
'''Implementation of the accept method : only pushes the work to the internal queue.
In case the queue is full, raises an AsynResultTimeout error'''
try:
# try to put the request in the queue
self._client_queue.put_nowait(sock)
except Queue.Full:
# queue was full, reject request
raise AsyncResultTimeout("server is overloaded")


class ForkingServer(Server):
def __init__(self, *args, **kwargs):
if not signal:
Expand Down

1 comment on commit 8901ae1

@tomerfiliba
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work. for clarity reasons, i'd raise a ThreadPoolFull exception (or something like that) instead of AsyncResultTimeout.

Please sign in to comment.