Permalink
Browse files

Refactor thread-/greenlet-local stuff from pool.py into thread_util.p…

…y PYTHON-428

Preparation for PYTHON-428
  • Loading branch information...
1 parent 622504b commit e31a07a84cf39fa9886f9d7f97212190a80b2c16 @ajdavis ajdavis committed Dec 19, 2012
View
@@ -235,23 +235,14 @@ def __init__(self, host=None, port=None, max_pool_size=10,
"2.6 you must install the ssl package "
"from PyPI.")
- if options.get('use_greenlets', False):
- if not pool.have_greenlet:
- raise ConfigurationError(
- "The greenlet module is not available. "
- "Install the greenlet package from PyPI."
- )
- self.pool_class = pool.GreenletPool
- else:
- self.pool_class = pool.Pool
-
- self.__pool = self.pool_class(
+ use_greenlets = options.get('use_greenlets', False)
+ self.__pool = pool.Pool(
None,
self.__max_pool_size,
self.__net_timeout,
self.__conn_timeout,
- self.__use_ssl
- )
+ self.__use_ssl,
+ use_greenlets)
self.__document_class = document_class
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
@@ -413,15 +413,11 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
self.__opts[option] = value
self.__opts.update(options)
- if self.__opts.get('use_greenlets', False):
- if not have_gevent:
- raise ConfigurationError(
- "The gevent module is not available. "
- "Install the gevent package from PyPI."
- )
- self.pool_class = pool.GreenletPool
- else:
- self.pool_class = pool.Pool
+ self.__use_greenlets = options.get('use_greenlets', False)
+ if self.__use_greenlets and not have_gevent:
+ raise ConfigurationError(
+ "The gevent module is not available. "
+ "Install the gevent package from PyPI.")
self.__auto_start_request = self.__opts.get('auto_start_request', False)
self.__in_request = self.__auto_start_request
@@ -703,9 +699,9 @@ def __is_master(self, host):
"""Directly call ismaster.
Returns (response, connection_pool, ping_time in seconds).
"""
- connection_pool = self.pool_class(
+ connection_pool = pool.Pool(
host, self.__max_pool_size, self.__net_timeout, self.__conn_timeout,
- self.__use_ssl)
+ self.__use_ssl, self.__use_greenlets)
sock_info = connection_pool.get_socket()
try:
View
@@ -19,7 +19,8 @@
import threading
import weakref
-from pymongo.errors import ConnectionFailure
+from pymongo import thread_util
+from pymongo.errors import ConnectionFailure, ConfigurationError
have_ssl = True
@@ -29,16 +30,6 @@
have_ssl = False
-# PyMongo does not use greenlet-aware connection pools by default, but it will
-# attempt to do so if you pass use_greenlets=True to Connection or
-# ReplicaSetConnection
-have_greenlet = True
-try:
- import greenlet
-except ImportError:
- have_greenlet = False
-
-
NO_REQUEST = None
NO_SOCKET_YET = -1
@@ -102,16 +93,26 @@ def __repr__(self):
# Do *not* explicitly inherit from object or Jython won't call __del__
# http://bugs.jython.org/issue1057
-class BasePool:
- def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl):
+class Pool:
+ def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl,
+ use_greenlets):
"""
:Parameters:
- `pair`: a (hostname, port) tuple
- `max_size`: approximate number of idle connections to keep open
- `net_timeout`: timeout in seconds for operations on open connection
- `conn_timeout`: timeout in seconds for establishing connection
- `use_ssl`: bool, if True use an encrypted connection
+ - `use_greenlets`: bool, if True then start_request() assigns a
+ socket to the current greenlet - otherwise it is assigned to the
+ current thread
"""
+ if use_greenlets and not thread_util.have_greenlet:
+ raise ConfigurationError(
+ "The greenlet module is not available. "
+ "Install the greenlet package from PyPI."
+ )
+
self.sockets = set()
self.lock = threading.Lock()
@@ -124,18 +125,13 @@ def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl):
self.net_timeout = net_timeout
self.conn_timeout = conn_timeout
self.use_ssl = use_ssl
+ self._ident = thread_util.create_ident(use_greenlets)
- # Map self._get_thread_ident() -> request socket
+ # Map self._ident.get() -> request socket
self._tid_to_sock = {}
- # Weakrefs used by subclasses to watch for dead threads or greenlets.
- # We must keep a reference to the weakref to keep it alive for at least
- # as long as what it references, otherwise its delete-callback won't
- # fire.
- self._refs = {}
-
- # Map self._get_thread_ident() -> # times start_request called
- self._tid_to_request_count = {}
+ # Count the number of calls to start_request() per thread or greenlet
+ self._request_counter = thread_util.Counter(use_greenlets)
def reset(self):
# Ignore this race condition -- if many threads are resetting at once,
@@ -284,27 +280,20 @@ def start_request(self):
# have no socket assigned to the request yet.
self._set_request_state(NO_SOCKET_YET)
- tid = self._get_thread_ident()
- if tid in self._tid_to_request_count:
- self._tid_to_request_count[tid] += 1
- else:
- self._tid_to_request_count[tid] = 1
+ self._request_counter.inc()
def in_request(self):
- return self._get_request_state() != NO_REQUEST
+ return bool(self._request_counter.get())
def end_request(self):
- tid = self._get_thread_ident()
+ tid = self._ident.get()
# Check if start_request has ever been called in this thread / greenlet
- count = self._tid_to_request_count.get(tid)
- if count is not None:
- if count > 1:
- # Decrement start_request counter
- self._tid_to_request_count[tid] -= 1
- else:
+ count = self._request_counter.get()
+ if count:
+ self._request_counter.dec()
+ if count == 1:
# End request
- self._tid_to_request_count.pop(tid)
sock_info = self._get_request_state()
self._set_request_state(NO_REQUEST)
if sock_info not in (NO_REQUEST, NO_SOCKET_YET):
@@ -382,16 +371,16 @@ def _check(self, sock_info, pair):
raise
def _set_request_state(self, sock_info):
- tid = self._get_thread_ident()
+ tid = self._ident.get()
if sock_info == NO_REQUEST:
# Ending a request
- self._refs.pop(tid, None)
+ self._ident.unwatch()
self._tid_to_sock.pop(tid, None)
else:
self._tid_to_sock[tid] = sock_info
- if tid not in self._refs:
+ if not self._ident.watching():
# Closure over tid and poolref. Don't refer directly to self,
# otherwise there's a cycle.
@@ -408,7 +397,6 @@ def on_thread_died(ref):
pool = poolref()
if pool:
# End the request
- pool._refs.pop(tid, None)
request_sock = pool._tid_to_sock.pop(tid, None)
# Was thread ever assigned a socket before it died?
@@ -418,19 +406,12 @@ def on_thread_died(ref):
# Random exceptions on interpreter shutdown.
pass
- self._watch_current_thread(on_thread_died)
+ self._ident.watch(on_thread_died)
def _get_request_state(self):
- tid = self._get_thread_ident()
+ tid = self._ident.get()
return self._tid_to_sock.get(tid, NO_REQUEST)
- # Overridable methods for pools.
- def _get_thread_ident(self):
- raise NotImplementedError
-
- def _watch_current_thread(self, callback):
- raise NotImplementedError
-
def __del__(self):
# Avoid ResourceWarnings in Python 3
for sock_info in self.sockets:
@@ -441,61 +422,6 @@ def __del__(self):
request_sock.close()
-class Pool(BasePool):
- """A simple connection pool.
-
- Calling start_request() acquires a thread-local socket, which is returned
- to the pool when the thread calls end_request() or dies.
- """
- def __init__(self, *args, **kwargs):
- BasePool.__init__(self, *args, **kwargs)
- self._local = threading.local()
-
- # After a thread calls start_request() and we assign it a socket, we must
- # watch the thread to know if it dies without calling end_request so we can
- # return its socket to the idle pool, self.sockets. We watch for
- # thread-death using a weakref callback to a thread local. The weakref is
- # permitted on subclasses of object but not object() itself, so we make
- # this class.
- class ThreadVigil(object):
- pass
-
- def _get_thread_ident(self):
- if not hasattr(self._local, 'vigil'):
- self._local.vigil = Pool.ThreadVigil()
- return id(self._local.vigil)
-
- def _watch_current_thread(self, callback):
- tid = self._get_thread_ident()
- self._refs[tid] = weakref.ref(self._local.vigil, callback)
-
-
-class GreenletPool(BasePool):
- """A simple connection pool.
-
- Calling start_request() acquires a greenlet-local socket, which is returned
- to the pool when the greenlet calls end_request() or dies.
- """
- # Overrides
- def _get_thread_ident(self):
- return id(greenlet.getcurrent())
-
- def _watch_current_thread(self, callback):
- current = greenlet.getcurrent()
- tid = self._get_thread_ident()
-
- if hasattr(current, 'link'):
- # This is a Gevent Greenlet (capital G), which inherits from
- # greenlet and provides a 'link' method to detect when the
- # Greenlet exits.
- current.link(callback)
- self._refs[tid] = None
- else:
- # This is a non-Gevent greenlet (small g), or it's the main
- # greenlet.
- self._refs[tid] = weakref.ref(current, callback)
-
-
class Request(object):
"""
A context manager returned by Connection.start_request(), so you can do
Oops, something went wrong.

0 comments on commit e31a07a

Please sign in to comment.