Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Reduce critical section protected by global lock #189

Merged
merged 5 commits into from

2 participants

@edevil

This tries to reduce the amount of operations done while holding the global lock. Also addresses bug #184.

pycassa/pool.py
((71 lines not shown))
- message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
- % (size_msg, self.pool_timeout)
- raise NoConnectionAvailable(message)
- finally:
- self._pool_lock.release()
+
+ conn = self._new_if_required(self._pool_size)
+ if not conn:
+ try:
+ # We don't want to waste time blocking if overflow is not enabled; similarly,
+ # if we're not at the max overflow, we can fail quickly and create a new
+ # connection
+ timeout = self.pool_timeout
+ if timeout == -1:
+ timeout = None
+ block = self._current_conns >= self._max_conns
@thobbs Owner
thobbs added a note

Since we're not checking the value of _current_conns while holding the lock, I think there's a subtle race here:

  1. thread-1 goes through _new_if_required(), sees that _current_conns == _max_conns, and doesn't create a new connection
  2. In thread-2, _current_conns gets decremented
  3. thread-1 hits this check, sees that _current_conns < _max_conns, and sets block to False
  4. In thread-2 (or some other thread), _current_conns gets incremented, and the queue is still empty
  5. thread-1 doesn't block on trying to get something from the queue, so it fails immediately. It goes through _new_if_required again, but since _current_conns == _max_conns at this point, it doesn't create a connection.
  6. NoConnectionAvailable is raised in thread-1 without every having waited for a connection to become available

I'm not sure about what would need to happen for steps 2 and 4 to occur, but it's probably not safe to bet that they won't ever happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@edevil

I've addressed the case you mentioned.

@thobbs
Owner

Well done. Thank you very much!

@thobbs thobbs merged commit 53454c1 into pycassa:master

1 check passed

Details default The Travis build passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 13, 2013
Commits on Feb 14, 2013
  1. Small re-factorization.

    Andre Cruz authored
  2. Another round at re-factoring this.

    Andre Cruz authored
  3. Prevent situation in which an attempt to obtain a connection would fa…

    Andre Cruz authored
    …il without even waiting for queue.
This page is out of date. Refresh to see the latest.
Showing with 48 additions and 36 deletions.
  1. +48 −36 pycassa/pool.py
View
84 pycassa/pool.py
@@ -512,53 +512,65 @@ def _put_conn(self, conn):
self._q.put_nowait(conn)
return conn
+ def _new_if_required(self, max_conns, check_empty_queue=False):
+ """ Creates new connection if there is room """
+ try:
+ self._pool_lock.acquire()
+ if (not check_empty_queue or self._q.empty()) and self._current_conns < max_conns:
+ new_conn = True
+ self._current_conns += 1
+ else:
+ new_conn = False
+ finally:
+ self._pool_lock.release()
+
+ if new_conn:
+ try:
+ return self._create_connection()
+ except:
+ try:
+ self._pool_lock.acquire()
+ self._current_conns -= 1
+ raise
+ finally:
+ self._pool_lock.release()
+ return None
+
def get(self):
""" Gets a connection from the pool. """
+ conn = None
if self._pool_threadlocal:
try:
- conn = None
if self._tlocal.current:
conn = self._tlocal.current
if conn:
return conn
except AttributeError:
pass
- try:
- self._pool_lock.acquire()
- if self._current_conns < self._pool_size:
- # The pool was not prefilled, and we need to add connections to reach pool_size
- conn = self._create_connection()
- self._current_conns += 1
+
+ conn = self._new_if_required(self._pool_size)
+ if not conn:
+ # if queue is empty and max_overflow is not reached, create new conn
+ conn = self._new_if_required(self._max_conns, check_empty_queue=True)
+
+ if not conn:
+ # We will have to fetch from the queue, and maybe block
+ timeout = self.pool_timeout
+ if timeout == -1:
+ timeout = None
+
+ try:
+ conn = self._q.get(timeout=timeout)
+ except Queue.Empty:
+ self._notify_on_pool_max(pool_max=self._max_conns)
+ size_msg = "size %d" % (self._pool_size, )
+ if self._overflow_enabled:
+ size_msg += "overflow %d" % (self._max_overflow)
+ message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
+ % (size_msg, self.pool_timeout)
+ raise NoConnectionAvailable(message)
else:
- try:
- # We don't want to waste time blocking if overflow is not enabled; similarly,
- # if we're not at the max overflow, we can fail quickly and create a new
- # connection
- timeout = self.pool_timeout
- if timeout == -1:
- timeout = None
- block = self._current_conns >= self._max_conns
- elif timeout == 0:
- block = False
- else:
- block = self._current_conns >= self._max_conns
-
- conn = self._q.get(block, timeout)
- conn._checkout()
- except Queue.Empty:
- if self._current_conns < self._max_conns:
- conn = self._create_connection()
- self._current_conns += 1
- else:
- self._notify_on_pool_max(pool_max=self._max_conns)
- size_msg = "size %d" % (self._pool_size, )
- if self._overflow_enabled:
- size_msg += "overflow %d" % (self._max_overflow)
- message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
- % (size_msg, self.pool_timeout)
- raise NoConnectionAvailable(message)
- finally:
- self._pool_lock.release()
+ conn._checkout()
if self._pool_threadlocal:
self._tlocal.current = conn
Something went wrong with that request. Please try again.