Skip to content
This repository has been archived by the owner on Aug 4, 2020. It is now read-only.

Reduce critical section protected by global lock #189

Merged
merged 5 commits into from Feb 15, 2013
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 53 additions & 37 deletions pycassa/pool.py
Expand Up @@ -512,53 +512,69 @@ def _put_conn(self, conn):
self._q.put_nowait(conn)
return conn

def _new_if_required(self, max_conns):
""" Creates new connection if there is room """
try:
self._pool_lock.acquire()
if self._current_conns < max_conns:
new_conn = True
self._current_conns += 1
else:
new_conn = False
finally:
self._pool_lock.release()

if new_conn:
try:
return self._create_connection()
except:
try:
self._pool_lock.acquire()
self._current_conns -= 1
raise
finally:
self._pool_lock.release()
return None

def get(self):
""" Gets a connection from the pool. """
conn = None
if self._pool_threadlocal:
try:
conn = None
if self._tlocal.current:
conn = self._tlocal.current
if conn:
return conn
except AttributeError:
pass
try:
self._pool_lock.acquire()
if self._current_conns < self._pool_size:
# The pool was not prefilled, and we need to add connections to reach pool_size
conn = self._create_connection()
self._current_conns += 1
else:
try:
# We don't want to waste time blocking if overflow is not enabled; similarly,
# if we're not at the max overflow, we can fail quickly and create a new
# connection
timeout = self.pool_timeout
if timeout == -1:
timeout = None
block = self._current_conns >= self._max_conns
elif timeout == 0:
block = False
else:
block = self._current_conns >= self._max_conns

conn = self._q.get(block, timeout)
conn._checkout()
except Queue.Empty:
if self._current_conns < self._max_conns:
conn = self._create_connection()
self._current_conns += 1
else:
self._notify_on_pool_max(pool_max=self._max_conns)
size_msg = "size %d" % (self._pool_size, )
if self._overflow_enabled:
size_msg += "overflow %d" % (self._max_overflow)
message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
% (size_msg, self.pool_timeout)
raise NoConnectionAvailable(message)
finally:
self._pool_lock.release()

conn = self._new_if_required(self._pool_size)
if not conn:
try:
# We don't want to waste time blocking if overflow is not enabled; similarly,
# if we're not at the max overflow, we can fail quickly and create a new
# connection
timeout = self.pool_timeout
if timeout == -1:
timeout = None
block = self._current_conns >= self._max_conns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're not checking the value of _current_conns while holding the lock, I think there's a subtle race here:

  1. thread-1 goes through _new_if_required(), sees that _current_conns == _max_conns, and doesn't create a new connection
  2. In thread-2, _current_conns gets decremented
  3. thread-1 hits this check, sees that _current_conns < _max_conns, and sets block to False
  4. In thread-2 (or some other thread), _current_conns gets incremented, and the queue is still empty
  5. thread-1 doesn't block on trying to get something from the queue, so it fails immediately. It goes through _new_if_required again, but since _current_conns == _max_conns at this point, it doesn't create a connection.
  6. NoConnectionAvailable is raised in thread-1 without every having waited for a connection to become available

I'm not sure about what would need to happen for steps 2 and 4 to occur, but it's probably not safe to bet that they won't ever happen.

elif timeout == 0:
block = False
else:
block = self._current_conns >= self._max_conns

conn = self._q.get(block, timeout)
conn._checkout()
except Queue.Empty:
conn = self._new_if_required(self._max_conns)
if not conn:
self._notify_on_pool_max(pool_max=self._max_conns)
size_msg = "size %d" % (self._pool_size, )
if self._overflow_enabled:
size_msg += "overflow %d" % (self._max_overflow)
message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
% (size_msg, self.pool_timeout)
raise NoConnectionAvailable(message)

if self._pool_threadlocal:
self._tlocal.current = conn
Expand Down