Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
There's a new pool implementation
Browse files Browse the repository at this point in the history
  zc.resumelb.classlesspool.ClasslessPool that allocates work
  solely based on backlogs, ignoring resumes.  This is useful for
  smaller applications that don't have large resident sets or a good
  way to segregate requests, but that can benefit from ZooKeeper-aware
  load balancing.
  • Loading branch information
Jim Fulton committed Feb 15, 2015
1 parent 5dd5889 commit a197eb5
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 59 deletions.
7 changes: 7 additions & 0 deletions src/zc/resumelb/README.txt
Expand Up @@ -248,6 +248,13 @@ Change History

Thanks to: https://github.com/zopefoundation/zc.resumelb/pull/3

- There's a new pool implementation
``zc.resumelb.classlesspool.ClasslessPool`` that allocates work
solely based on backlogs, ignoring resumes. This is useful for
smaller applications that don't have large resident sets or a good
way to segregate requests, but that can benefit from ZooKeeper-aware
load balancing.

0.7.5 (2014-11-18)
------------------

Expand Down
86 changes: 86 additions & 0 deletions src/zc/resumelb/classlesspool.py
@@ -0,0 +1,86 @@
"""Load balancer pool that ignores request class and worker skill.
It allocates work by least backlog.
"""
import gevent.event
import llist
import zc.resumelb.lb

class ClasslessPool(zc.resumelb.lb.PoolBase):
"""Manage and assign work to workers.
"""
# This was added to define how the lb uses a pool, now that
# alternate pool implementations can be provided.

def __init__(self, single_version=False):
super(ClasslessPool, self).__init__(single_version)
self.backlog = 0
self.workers = set()
self.line = llist.dllist()
self.event = gevent.event.Event()

def get(self, request_class, timeout=None):
"""Get a worker to handle the given request class (string)
"""
line = self.line
if not line:
self.event.wait(timeout)
if not line:
return None

node = self.line.first
best_worker = None
best_backlog = 999999999
while node is not None:
worker = node.value
backlog = worker.backlog
if backlog == 0:
best_worker = worker
break
if backlog < best_backlog:
best_backlog = backlog
best_worker = worker
node = node.next

# Move worker from lru to mru end of queue
line.remove(best_worker.lnode)
best_worker.lnode = line.append(best_worker)

best_worker.backlog += 1
self.backlog += 1

return best_worker

@property
def mbacklog(self):
nworkers = len(self.workers)
if nworkers:
return self.backlog / nworkers
else:
return None

def _new_resume(self, worker, resume):
if worker not in self.workers:
self.workers.add(worker)
worker.lnode = self.line.appendleft(worker)
worker.backlog = 0
self.event.set()

def put(self, worker):
"""Notify the pool that the worker has completed a request.
"""
self.backlog -= 1
assert self.backlog >= 0, self.backlog
worker.backlog -= 1
assert worker.backlog >= 0, worker.backlog

def _remove(self, worker):
if getattr(worker, 'lnode', None) is not None:
self.line.remove(worker.lnode)
worker.lnode = None
self.workers.remove(worker)
if not self.workers:
self.event.clear()

def update_settings(self, settings):
pass
162 changes: 162 additions & 0 deletions src/zc/resumelb/classlesspool.test
@@ -0,0 +1,162 @@
===============================================
Classless (Non-resume-based) load balancer pool
===============================================

For smaller applications, it's useful to have a ZooKeeper-aware
load-balancer that avoids the need to update traditional load-balancer
configurations.

The load balancer works by connecting to workers, creating local
workers for each connection, adding local workers to the pool, and
by accepting wsgi request, getting local workers from the pool and
passing the wsgi requests to the local workers, which, in term,
forwward the requests to the remote workers.

We'll test the pool with stand-ins for the local workers.

>>> import zc.resumelb.classlesspool
>>> pool = zc.resumelb.classlesspool.ClasslessPool()

The get method is used to get a worker from the pool. A request class
and an optional timeout is passed. (The timeout is mainly useful for
testing.)

>>> pool.get('foo', 0.0)

We didn't get a worker (we timed out), because we haven't added one.

>>> from zc.resumelb.tests import FauxPoolWorker as Worker

>>> w1 = Worker('w1')

>>> pool.new_resume(w1, {})

>>> pool.get('foo', 0.0)
w1

This time, we got the one we registered.

If we create another and register it, we'll get it, because it
has a backlog of 0, while w1 has a backlog of 1:

>>> w2 = Worker('w2')
>>> pool.new_resume(w2, {})

>>> pool.get('foo')
w2

If we call get again, we'll get w1. This is because w1 and w2 have the
same backlog, but w2 went to the back of the line.

>>> pool.get('foo')
w1

And so on:

>>> pool.get('foo')
w2

The request class doesn't matter.

>>> pool.get('bar')
w1
>>> pool.get('bar')
w2

We can ask for a pool's status:

>>> import pprint
>>> pprint.pprint(pool.status()) # doctest: +NORMALIZE_WHITESPACE
{'backlog': 6,
'mean_backlog': 3,
'workers': [('w1', 3, 3, None),
('w2', 3, 3, None)],
'workers_ex': [('w1', 0), ('w2', 0)]}

When a worker is done doing its work, we put it back in the pool:

>>> pool.put(w2)
>>> pool.put(w2)

>>> pprint.pprint(pool.status()) # doctest: +NORMALIZE_WHITESPACE
{'backlog': 4,
'mean_backlog': 2,
'workers': [('w1', 3, 3, None),
('w2', 1, 1, None)],
'workers_ex': [('w1', 0), ('w2', 0)]}

It will be preferred until it's backlog grows:

>>> pool.get('bar')
w2
>>> pool.get('bar')
w2
>>> pool.get('bar')
w1


Worker disconnect
=================

When a worker disconnect, it's removed from the pool:

>>> pool.remove(w1)
>>> for i in range(4): pool.put(w1)

>>> pprint.pprint(pool.status())
{'backlog': 3,
'mean_backlog': 3,
'workers': [('w2', 3, 3, None)],
'workers_ex': [('w2', 0)]}

>>> pool.remove(w2)
>>> for i in range(3): pool.put(w2)

>>> pprint.pprint(pool.status())
{'backlog': 0, 'mean_backlog': None, 'workers': [], 'workers_ex': []}

Single-Version
==============

If a pool is in single-version mode, only workers from the majority
version will be used:

>>> pool = zc.resumelb.classlesspool.ClasslessPool(single_version=True)
>>> w1.version = 1
>>> pool.new_resume(w1, None)
>>> pool.version
1
>>> w2.version = 2
>>> pool.new_resume(w2, None)
>>> pool.version
1
>>> pool.get('')
w1
>>> pprint.pprint(pool.status())
{'backlog': 1,
'mean_backlog': 1,
'workers': [('w1', 1, 1, None)],
'workers_ex': [('w1', 0)]}

>>> w3 = Worker('w3')
>>> w3.version = 2
>>> pool.new_resume(w3, None)
>>> pool.version
2
>>> pool.get('')
w2
>>> pprint.pprint(pool.status())
{'backlog': 2,
'mean_backlog': 1,
'workers': [('w2', 1, 1, None), ('w3', 0, 0, None)],
'workers_ex': [('w2', 0), ('w3', 0)]}

Note that the pool backlog still reflects w1, which is still working
off a request.

>>> pool.put(w1)
>>> pprint.pprint(pool.status())
{'backlog': 1,
'mean_backlog': 0,
'workers': [('w2', 1, 1, None), ('w3', 0, 0, None)],
'workers_ex': [('w2', 0), ('w3', 0)]}
51 changes: 51 additions & 0 deletions src/zc/resumelb/interfaces.py
@@ -0,0 +1,51 @@
import zope.interfaces

class IPool(zope.interface.Interface):
"""Manage and assign work to workers.
"""
# This was added to define how the lb uses a pool, now that
# alternate pool implementations can be provided.

def __init__(single_version=False):
"""Initializd the pool
The ``single_version`` argument is supllied by name.
If true, then the pool should only use workers of the same
version for which the highest number of workers are running
that version.
"""

backlog = zope.interface.Attribute("number of active requests")

def get(request_class):
"""Get a worker to handle the given request class (string)
"""

mbacklog = zope.interface.Attribute(
"(possibly time-weighted) mean worker backlog for the load balancer")

def new_resume(worker, data):
"""Update the resume for a worker
If the worker isn't in the pool, add it.
"""

def put(worker):
"""Notify the pool that the worker has completed a request.
"""

def remove(worker):
"""Remove the worker from the pool.
It shouldn't get any more work and should be forgotten.
"""

def update_settings(settings):
"""Update the pool with the given settings (mapping).
Extra keys in the settings should be ignored.
The settings argument should be used once and not modified.
"""

workers = zope.interface.Attribute("iterable of workers")

0 comments on commit a197eb5

Please sign in to comment.