Skip to content

Commit

Permalink
Use a lock around all updates to shared state.
Browse files Browse the repository at this point in the history
Prevent closed connections from ending up in the pool (Mutator uses "return_to_pool"). Since current_conns has already been decremented, the pool would reach a state where a new connection was always opened but could not be put in the queue since it was full.
  • Loading branch information
edevil committed Jul 26, 2013
1 parent fda2b4e commit 5171ee4
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions pycassa/pool.py
Expand Up @@ -66,7 +66,8 @@ def return_to_pool(self):
on the wrapper.
"""
self._pool.put(self)
if self.transport.isOpen():
self._pool.put(self)

def _checkin(self):
if self._state == ConnectionWrapper._IN_QUEUE:
Expand Down Expand Up @@ -459,13 +460,17 @@ def _get_new_wrapper(self, server):
def _replace_wrapper(self):
"""Try to replace the connection."""
if not self._q.full():
conn = self._create_connection()
conn._checkin()

try:
conn = self._create_connection()
conn._checkin()
self._q.put(conn, False)
self._current_conns += 1
except Queue.Full:
pass
conn._dispose_wrapper(reason="pool is already full")
else:
self._pool_lock.acquire()
self._current_conns += 1
self._pool_lock.release()

def _clear_current(self):
""" If using threadlocal, clear our threadlocal current conn. """
Expand Down Expand Up @@ -581,8 +586,8 @@ def execute(self, f, *args, **kwargs):
conn = self.get()
return getattr(conn, f)(*args, **kwargs)
finally:
if conn and conn.transport.isOpen():
self.put(conn)
if conn:
conn.return_to_pool()

def dispose(self):
""" Closes all checked in connections in the pool. """
Expand Down

0 comments on commit 5171ee4

Please sign in to comment.