Skip to content
Browse files

Improved connection pooling documentation.

  • Loading branch information...
1 parent d92c601 commit f97a36635ccc81f3fc3594937622418ce3dc15bc @thobbs thobbs committed Sep 5, 2010
Showing with 116 additions and 19 deletions.
  1. +21 −0 README.mkd
  2. +95 −19 pycassa/pool.py
View
21 README.mkd
@@ -293,6 +293,27 @@ Calls to `insert` and `remove` can also be chained:
>>> cf.batch().remove('foo').remove('bar').send()
+Connection Pooling
+------------------
+
+pycassa offers several types of connection pools for different usage scenarios. These include:
+
+* QueuePool - typical connection pool that maintains a queue of open connections
+* SingletonThreadPool - one connection per thread
+* StaticPool - a single connection used for all operations
+* NullPool - no pooling is performed, but failover is supported
+* AssertionPool - asserts that at most one connection is open at a time; useful for debugging
+
+To create a pool and use a connection:
+
+ >>> pool = pycassa.QueuePool(keyspace='Keyspace1')
+ >>> connection = pool.get()
+ >>> cf = pycassa.ColumnFamily(connection, 'Standard1')
+ >>> cf.insert('key', {'col': 'val'})
+ >>> connection.return_to_pool()
+
+Automatic retries (or failover) are supported with all types of pools except for StaticPools. This means that if any operation fails, it will be transparently retried on other servers until it succeeds or a maximum number of failures is reached.
+
Advanced
--------
View
114 pycassa/pool.py
@@ -29,7 +29,7 @@ class Pool(object):
def __init__(self, keyspace, server_list=['localhost:9160'],
credentials=None, timeout=0.5, logging_name=None,
- use_threadlocal=True, listeners=None):
+ use_threadlocal=True, listeners=[]):
"""
Construct a Pool.
@@ -91,9 +91,8 @@ def __init__(self, keyspace, server_list=['localhost:9160'],
self.add_listener(PycassaLogger())
- if listeners:
- for l in listeners:
- self.add_listener(l)
+ for l in listeners:
+ self.add_listener(l)
self.set_server_list(server_list)
@@ -124,14 +123,15 @@ def set_server_list(self, server_list):
self.server_list[i] = temp
self._list_position = 0
-
- dic = self._get_dic()
- dic['server_list'] = self.server_list
- if self._on_server_list:
- for l in self._on_server_list:
- l.obtained_server_list(dic)
+ self._notify_on_server_list(self.server_list)
def _get_next_server(self):
+ """
+ Gets the next 'localhost:port' combination from the list of
+ servers and increments the position. This is not thread-safe,
+ but client-side load-balancing isn't so important that this is
+ a problem.
+ """
server = self.server_list[self._list_position % len(self.server_list)]
self._list_position += 1
return server
@@ -262,6 +262,13 @@ def _notify_on_dispose(self, conn_record, msg="", error=None):
for l in self._on_dispose:
l.connection_disposed(dic)
+ def _notify_on_server_list(self, server_list):
+ dic = self._get_dic()
+ dic['server_list'] = server_list
+ if self._on_server_list:
+ for l in self._on_server_list:
+ l.obtained_server_list(dic)
+
def _notify_on_recycle(self, old_conn, new_conn):
if self._on_recycle:
dic = self._get_dic()
@@ -326,6 +333,9 @@ class ConnectionWrapper(connection.Connection):
"""
+ # These mark the state of the connection so that we can
+ # check to see that they are not returned, checked out,
+ # or disposed twice (or from the wrong state).
_IN_QUEUE = 0
_CHECKED_OUT = 1
_DISPOSED = 2
@@ -342,6 +352,8 @@ def __init__(self, pool, *args, **kwargs):
self._pool._notify_on_connect(self)
def return_to_pool(self):
+ """Returns the Connection to the pool. This has the same
+ effect as calling Pool.return_conn() on the wrapper."""
self._pool.return_conn(self)
def _checkin(self):
@@ -389,6 +401,18 @@ def __getattr__(self, attr):
raise NotImplementedError()
class ImmutableConnectionWrapper(ConnectionWrapper):
+ """
+ A ConnectionWrapper that does not support retries through replacing
+ one wrapper with another or by swapping out the lower-level
+ pycassa.connection.Connection.
+
+ This is currently only used by a StaticPool. Here, the connection
+ is immutable because multiple threads may be using the same connection
+ at the same time.
+
+ These should not be created directly.
+
+ """
def __init__(self, pool, *args, **kwargs):
super(ImmutableConnectionWrapper, self).__init__(pool, *args, **kwargs)
@@ -407,13 +431,24 @@ def _client_call(*args, **kwargs):
return getattr(self, attr)
class ReplaceableConnectionWrapper(ConnectionWrapper):
+ """
+ A ConnectionWrapper that supports retries by obtaining another wrapper
+ from the pool and swapping all contents with it.
+
+ Caution should be used when this not used with use_threadlocal=True.
+
+ These should not be created directly.
+
+ """
def __init__(self, pool, max_retries, *args, **kwargs):
super(ReplaceableConnectionWrapper, self).__init__(pool, *args, **kwargs)
self._retry_count = 0
self._max_retries = max_retries
def _replace(self, new_conn_wrapper):
+ """Get another wrapper from the pool and replace our own contents
+ with its contents."""
super(ConnectionWrapper, self)._replace(new_conn_wrapper)
self._lock = new_conn_wrapper._lock
self._info = new_conn_wrapper.info
@@ -450,18 +485,30 @@ def _client_call(*args, **kwargs):
return getattr(self, attr)
class MutableConnectionWrapper(ConnectionWrapper):
+ """A ConnectionWrapper that supports retries by opening a new
+ connection to the next server in Pool's list.
+
+ Caution should be used when this is not used with use_threadlocal=True.
+
+ These should not be created directly.
+
+ """
def __init__(self, pool, max_retries, *args, **kwargs):
super(MutableConnectionWrapper, self).__init__(pool, *args, **kwargs)
self._retry_count = 0
self._max_retries = max_retries
def _replace_conn(self):
+ """Try getting servers from Pool's list and open connections to them
+ until one succeeds or we have failed enough times; if we succeed,
+ swap the contents of our pycassa.connection.Connection attributes with
+ that connection's."""
+ self.close()
failure_count = 0
while failure_count < 2 * len(self._pool.server_list):
try:
new_serv = self._pool._get_next_server()
- self.close()
new_conn = connection.Connection(self._pool.keyspace, [new_serv],
credentials=self._pool.credentials,
use_threadlocal=self._pool._pool_threadlocal)
@@ -493,7 +540,16 @@ def _client_call(*args, **kwargs):
return getattr(self, attr)
class QueuePool(Pool):
- """A Pool that maintains a queue of open connections."""
+ """
+ A Pool that maintains a queue of open connections.
+
+ This is typically what you want to use for connection pooling.
+
+ Be careful when using a QueuePool with use_threadlocal=True,
+ especially with retries enabled. Synchronization may be required to
+ prevent the connection from changing while another thread is using it.
+
+ """
def __init__(self, pool_size=5, max_overflow=10,
pool_timeout=30, recycle=10000, max_retries=5,
@@ -505,6 +561,13 @@ def __init__(self, pool_size=5, max_overflow=10,
defaults to 5. This is the largest number of connections that
will be kept in the pool at one time.
+ A good choice for this is usually a multiple of the number of servers
+ passed to the Pool constructor. If a size less than this is chosen,
+ the last (len(server_list) - pool_size) servers may not be used until
+ either overflow occurs, a connection is recycled, or a connection
+ fails. Similarly, if a multiple of len(server_list) is not chosen,
+ those same servers would have a decreased load.
+
:param max_overflow: The maximum overflow size of the
pool. When the number of checked-out connections reaches the
size set in pool_size, additional connections will be
@@ -576,11 +639,12 @@ def _get_new_wrapper(self, server):
use_threadlocal=self._pool_threadlocal)
def _replace_wrapper(self):
- """Not the most efficient, but try to replace the connection."""
- try:
- self._q.put(self._create_connection(), False)
- except pool_queue.Full:
- pass
+ """Try to replace the connection."""
+ if not self._q.full:
+ try:
+ self._q.put(self._create_connection(), False)
+ except pool_queue.Full:
+ pass
def _do_return_conn(self, conn):
try:
@@ -666,6 +730,8 @@ def _do_get(self):
finally:
if self._overflow_lock is not None:
self._overflow_lock.release()
+
+ # Check to make sure the connection is good
try:
conn._ensure_connection()
except connection.NoServerAvailable:
@@ -803,7 +869,10 @@ class NullPool(Pool):
"""A Pool which does not pool connections.
Instead, it opens and closes the underlying Cassandra connection
- per each get()/return().
+ per each get()/return(). NullPools do offer retry behavior.
+
+ Instead of using this with threadlocal storage, you should use a
+ SingletonThreadPool.
"""
@@ -856,7 +925,12 @@ def dispose(self):
class StaticPool(Pool):
- """A Pool of exactly one connection, used for all requests."""
+ """
+ A Pool of exactly one connection, used for all requests.
+
+ StaticPools do not currently automatic retries.
+
+ """
def __init__(self, *args, **kwargs):
Pool.__init__(self, *args, **kwargs)
@@ -908,6 +982,8 @@ class AssertionPool(Pool):
out at a time. Useful for debugging code that is using more connections
than desired.
+ AssertionPools do support automatic retries.
+
"""
def __init__(self, max_retries=5, *args, **kwargs):

0 comments on commit f97a366

Please sign in to comment.
Something went wrong with that request. Please try again.