Skip to content

Commit

Permalink
Use with statement for pool lock acquisition
Browse files Browse the repository at this point in the history
  • Loading branch information
thobbs committed Jul 30, 2013
1 parent b54e637 commit da20f70
Showing 1 changed file with 11 additions and 24 deletions.
35 changes: 11 additions & 24 deletions pycassa/pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" Connection pooling for Cassandra connections. """

from __future__ import with_statement

import time
import threading
import random
Expand Down Expand Up @@ -204,17 +206,13 @@ def _get_max_overflow(self):
return self._max_overflow

def _set_max_overflow(self, max_overflow):
try:
self._pool_lock.acquire()

with self._pool_lock:
self._max_overflow = max_overflow
self._overflow_enabled = max_overflow > 0 or max_overflow == -1
if max_overflow == -1:
self._max_conns = (2 ** 31) - 1
else:
self._max_conns = self._pool_size + max_overflow
finally:
self._pool_lock.release()

max_overflow = property(_get_max_overflow, _set_max_overflow)
""" Whether or not a new connection may be opened when the
Expand Down Expand Up @@ -438,15 +436,12 @@ def fill(self):
.. versionadded:: 1.2.0
"""
try:
self._pool_lock.acquire()
with self._pool_lock:
while self._current_conns < self._pool_size:
conn = self._create_connection()
conn._checkin()
self._q.put(conn, False)
self._current_conns += 1
finally:
self._pool_lock.release()

def _get_new_wrapper(self, server):
return ConnectionWrapper(self, self.max_retries,
Expand All @@ -467,9 +462,8 @@ def _replace_wrapper(self):
except Queue.Full:
conn._dispose_wrapper(reason="pool is already full")
else:
self._pool_lock.acquire()
self._current_conns += 1
self._pool_lock.release()
with self._pool_lock:
self._current_conns += 1

def _clear_current(self):
""" If using threadlocal, clear our threadlocal current conn. """
Expand Down Expand Up @@ -508,32 +502,25 @@ def put(self, conn):
return_conn = put

def _decrement_overflow(self):
self._pool_lock.acquire()
self._current_conns -= 1
self._pool_lock.release()
with self._pool_lock:
self._current_conns -= 1

def _new_if_required(self, max_conns, check_empty_queue=False):
""" Creates new connection if there is room """
try:
self._pool_lock.acquire()
with self._pool_lock:
if (not check_empty_queue or self._q.empty()) and self._current_conns < max_conns:
new_conn = True
self._current_conns += 1
else:
new_conn = False
finally:
self._pool_lock.release()

if new_conn:
try:
return self._create_connection()
except:
try:
self._pool_lock.acquire()
with self._pool_lock:
self._current_conns -= 1
raise
finally:
self._pool_lock.release()
raise
return None

def get(self):
Expand Down

0 comments on commit da20f70

Please sign in to comment.