Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
You can now supply alternative pool implementations.
Browse files Browse the repository at this point in the history
  Thanks to: #3
  • Loading branch information
Jim Fulton committed Feb 7, 2015
2 parents 11f46bf + 3698eb9 commit bdbef68
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 28 deletions.
4 changes: 4 additions & 0 deletions src/zc/resumelb/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ maintaining ZooKeeper trees.
Change History
==============

- You can now supply alternative pool implementations.

Thanks to: https://github.com/zopefoundation/zc.resumelb/pull/3

0.7.5 (2014-11-18)
------------------

Expand Down
28 changes: 27 additions & 1 deletion src/zc/resumelb/lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ class LB:

def __init__(self, worker_addrs, classifier,
disconnect_message=default_disconnect_message,
pool_factory=None,
**pool_settings):
self.classifier = classifier
self.disconnect_message = disconnect_message
self.pool = Pool(**pool_settings)
if pool_factory is None:
pool_factory = Pool
self.pool = pool_factory(**pool_settings)
self.update_settings = self.pool.update_settings
self.workletts = {}
self.set_worker_addrs(worker_addrs)
Expand Down Expand Up @@ -328,6 +331,29 @@ def put(self, worker):
assert worker.backlog >= 0
_decay_backlog(worker, self.worker_decay)

def status(self):
return dict(
backlog = self.backlog,
mean_backlog = self.mbacklog,
workers = [
(worker.__name__,
worker.backlog,
worker.mbacklog,
(int(worker.oldest_time)
if worker.oldest_time else None),
)
for worker in sorted(
self.workers, key=lambda w: w.__name__)
],
workers_ex = [
(worker.__name__,
worker.write_queue.qsize(),
)
for worker in sorted(
self.workers, key=lambda w: w.__name__)
],
)

def _init_backlog(worker):
worker.backlog = getattr(worker, 'backlog', 0)
worker.dbacklog = getattr(worker, 'dbacklog', worker.backlog)
Expand Down
20 changes: 20 additions & 0 deletions src/zc/resumelb/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,29 @@ def worker_closes_socket():
<socket at 0x... fileno=[Errno 9] Bad file descriptor>
"""

def pool_status():
"""
We can ask the pool for it's status. It will give us back a dictionary.
>>> worker = mock.Mock()
>>> worker.__name__ = 'foo'
>>> worker.oldest_time = 1.0
>>> pool = zc.resumelb.lb.Pool()
>>> pool.new_resume(worker, {})
>>> status = pool.status()
>>> status['backlog']
0
>>> status['workers'][0][0]
'foo'
"""

def test_classifier(env):
return "yup, it's a test"

class TestPool(zc.resumelb.lb.Pool):
pass

def setUp(test):
zope.testing.setupstack.setUpDirectory(test)
zope.testing.setupstack.context_manager(test, mock.patch('gevent.signal'))
Expand Down
45 changes: 18 additions & 27 deletions src/zc/resumelb/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def handle(self, socket, address):
socket.settimeout(self.__socket_timeout)
return gevent.pywsgi.WSGIServer.handle(self, socket, address)

def _resolve(path):
rcmod, rcexpr = path.split(':')
__import__(rcmod)
rcmod = sys.modules[rcmod]
return eval(rcexpr, rcmod.__dict__)

def lbmain(args=None, run=True):
"""%prog [options] zookeeper_connection path
Expand Down Expand Up @@ -144,6 +150,12 @@ def lbmain(args=None, run=True):
'-r', '--request-classifier', default='zc.resumelb.lb:host_classifier',
help="Request classification function (module:expr)"
)
parser.add_option(
'-p', '--pool-factory', default='zc.resumelb.lb:Pool',
help=
"Callable which creates a pool (module:expr)."
" Will be called with settings as keyword arguments."
)
parser.add_option(
'-s', '--status-server',
help=("Run a status server for getting pool information. "
Expand Down Expand Up @@ -180,10 +192,8 @@ def lbmain(args=None, run=True):

zk = zc.zk.ZooKeeper(zookeeper)
addrs = zk.children(path+'/workers/providers')
rcmod, rcexpr = options.request_classifier.split(':')
__import__(rcmod)
rcmod = sys.modules[rcmod]
request_classifier = eval(rcexpr, rcmod.__dict__)
request_classifier = _resolve(options.request_classifier)
pool_factory = _resolve(options.pool_factory)

disconnect_message = options.disconnect_message
if disconnect_message:
Expand All @@ -195,6 +205,7 @@ def lbmain(args=None, run=True):
from zc.resumelb.lb import LB
lb = LB(map(zc.parse_addr.parse_addr, ()),
request_classifier, disconnect_message,
pool_factory=pool_factory,
single_version=options.single_version)


Expand All @@ -205,7 +216,7 @@ def lbmain(args=None, run=True):
def _():
lb.set_worker_addrs(to_send[0])

if options.single_version:
if options.single_version or pool_factory != zc.resumelb.lb.Pool:
@addrs
def get_addrs(a):
to_send[0] = dict(
Expand Down Expand Up @@ -258,29 +269,9 @@ def get_addrs(a):
if options.status_server:
def status(socket, addr):
pool = lb.pool
status = pool.status()
writer = socket.makefile('w')
writer.write(json.dumps(
dict(
backlog = pool.backlog,
mean_backlog = pool.mbacklog,
workers = [
(worker.__name__,
worker.backlog,
worker.mbacklog,
(int(worker.oldest_time)
if worker.oldest_time else None),
)
for worker in sorted(
pool.workers, key=lambda w: w.__name__)
],
workers_ex = [
(worker.__name__,
worker.write_queue.qsize(),
)
for worker in sorted(
pool.workers, key=lambda w: w.__name__)
],
))+'\n')
writer.write(json.dumps(status) + '\n')
writer.close()
socket.close()

Expand Down
32 changes: 32 additions & 0 deletions src/zc/resumelb/zk.test
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ string, the string is split and testing mode (run=False) is assumed.
-r REQUEST_CLASSIFIER, --request-classifier=REQUEST_CLASSIFIER
Request classification function
(module:expr)
-p POOL_FACTORY, --pool-factory=POOL_FACTORY
Callable which creates a pool (module:expr).
Will be called with settings as keyword
arguments.
-s STATUS_SERVER, --status-server=STATUS_SERVER
Run a status server for getting pool
information. The argument is a unix-domain
Expand Down Expand Up @@ -542,3 +546,31 @@ workers must register with a version, or they'll be ignored.
overall backlog: 0 Decayed: 0 Avg: 0
0: [127.0.0.1:39208, 127.0.0.1:60073]

Overriding the default Pool class
=================================

If you wish to override the default Pool class, you can use the pool-factory
(``-p``, ``--pool-factory``) option. Pool factories can be any callable that
takes keyword arguments.

>>> lb, server = zc.resumelb.zk.lbmain(
... 'zookeeper.example.com:2181 /test/lb -pzc.resumelb.tests:TestPool')
>>> gevent.sleep(1)
>>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
... with mock.patch('logging.basicConfig') as basicConfig:
... worker = zc.resumelb.zk.worker(
... app, None,
... zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
... address='127.0.0.1:0', run=False, version='42')
>>> gevent.sleep(.01)

Our pool is an instance of the TestPool class we specified:

>>> isinstance(lb.pool, zc.resumelb.tests.TestPool)
True

This also causes the loadbalancer to load the worker versions as the new pool
factory may need them:

>>> print list(lb.pool.workers)[0].version
42

0 comments on commit bdbef68

Please sign in to comment.