-
Notifications
You must be signed in to change notification settings - Fork 163
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite connection pool exception handling
The connection pool now uses a much simpler try/except/finally block in the context manager function to detect network/Thrift errors. The pool will now only refreshes connections when Thrift or socket errors occur, and will not react to unrelated application errors anymore. With this approach, the _ClientProxy hack is not needed anymore, so it has been completely eliminated. See issue #25.
- Loading branch information
Showing
2 changed files
with
50 additions
and
86 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,11 @@ | |
import contextlib | ||
import logging | ||
import Queue | ||
import socket | ||
import threading | ||
|
||
from thrift.Thrift import TException | ||
|
||
from .connection import Connection | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -19,44 +22,6 @@ | |
# | ||
|
||
|
||
class _ClientProxy(object): | ||
""" | ||
Proxy class to silently notice Thrift client exceptions. | ||
This class proxies all requests a Connection makes to the underlying | ||
Thrift client, and sets a flag when the client raised an exception, | ||
e.g. socket errors or Thrift protocol errors. | ||
The connection pool replaces tainted connections with fresh ones. | ||
""" | ||
def __init__(self, connection, client): | ||
self.connection = connection | ||
self.client = client | ||
self._cache = {} | ||
|
||
def __getattr__(self, name): | ||
""" | ||
Hook into attribute lookup and return wrapped methods. | ||
Since the client is only used for method calls, just treat | ||
every attribute access as a method to wrap. | ||
""" | ||
wrapped = self._cache.get(name) | ||
|
||
if wrapped is None: | ||
def wrapped(*args, **kwargs): | ||
method = getattr(self.client, name) | ||
assert callable(method) | ||
try: | ||
return method(*args, **kwargs) | ||
except: | ||
self.connection._tainted = True | ||
raise | ||
self._cache[name] = wrapped | ||
|
||
return wrapped | ||
|
||
|
||
class NoConnectionsAvailable(RuntimeError): | ||
""" | ||
Exception raised when no connections are available. | ||
|
@@ -99,11 +64,11 @@ def __init__(self, size, **kwargs): | |
self._queue = Queue.LifoQueue(maxsize=size) | ||
self._thread_connections = threading.local() | ||
|
||
self._connection_kwargs = kwargs | ||
self._connection_kwargs['autoconnect'] = False | ||
connection_kwargs = kwargs | ||
connection_kwargs['autoconnect'] = False | ||
|
||
for i in xrange(size): | ||
connection = self._create_connection() | ||
connection = Connection(**connection_kwargs) | ||
self._queue.put(connection) | ||
|
||
# The first connection is made immediately so that trivial | ||
|
@@ -112,12 +77,6 @@ def __init__(self, size, **kwargs): | |
with self.connection(): | ||
pass | ||
|
||
def _create_connection(self): | ||
"""Create a new connection with monkey-patched Thrift client.""" | ||
connection = Connection(**self._connection_kwargs) | ||
connection.client = _ClientProxy(connection, connection.client) | ||
return connection | ||
|
||
def _acquire_connection(self, timeout=None): | ||
"""Acquire a connection from the pool.""" | ||
try: | ||
|
@@ -152,27 +111,23 @@ def connection(self, timeout=None): | |
:rtype: :py:class:`happybase.Connection` | ||
""" | ||
|
||
# If this thread already holds a connection, just return it. | ||
# This is the short path for nested calls from the same thread. | ||
connection = getattr(self._thread_connections, 'current', None) | ||
if connection is not None: | ||
yield connection | ||
return | ||
|
||
# If this point is reached, this is the outermost connection | ||
# requests for a thread. Obtain a new connection from the pool | ||
# and keep a reference in a thread local so that nested | ||
# connection requests from the same thread can return the same | ||
# connection instance. | ||
# | ||
# Note: this code acquires a lock before assigning to the | ||
# thread local; see | ||
# http://emptysquare.net/blog/another-thing-about-pythons- | ||
# threadlocals/ | ||
|
||
connection = self._acquire_connection(timeout) | ||
with self._lock: | ||
self._thread_connections.current = connection | ||
|
||
return_after_use = False | ||
if connection is None: | ||
# This is the outermost connection requests for this thread. | ||
# Obtain a new connection from the pool and keep a reference | ||
# in a thread local so that nested connection requests from | ||
# the same thread can return the same connection instance. | ||
# | ||
# Note: this code acquires a lock before assigning to the | ||
# thread local; see | ||
# http://emptysquare.net/blog/another-thing-about-pythons- | ||
# threadlocals/ | ||
return_after_use = True | ||
connection = self._acquire_connection(timeout) | ||
with self._lock: | ||
self._thread_connections.current = connection | ||
|
||
try: | ||
# Open connection, because connections are opened lazily. | ||
|
@@ -182,23 +137,21 @@ def connection(self, timeout=None): | |
# Return value from the context manager's __enter__() | ||
yield connection | ||
|
||
finally: | ||
|
||
# Remove thread local reference, since the thread no longer | ||
# owns it. | ||
del self._thread_connections.current | ||
|
||
except (TException, socket.error): | ||
# Refresh the underlying Thrift client if an exception | ||
# occurred in the Thrift layer, since we don't know whether | ||
# the connection is still usable. | ||
if getattr(connection, '_tainted', False): | ||
logger.info("Replacing tainted pool connection") | ||
|
||
try: | ||
connection.close() | ||
except: | ||
pass | ||
logger.info("Replacing tainted pool connection") | ||
connection._refresh_thrift_client() | ||
connection.open() | ||
|
||
connection = self._create_connection() | ||
# Reraise to caller; see contextlib.contextmanager() docs | ||
raise | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
rogerhu
Contributor
|
||
|
||
self._return_connection(connection) | ||
finally: | ||
# Remove thread local reference after the outermost 'with' | ||
# block ends. Afterwards the thread no longer owns the | ||
# connection. | ||
if return_after_use: | ||
del self._thread_connections.current | ||
self._return_connection(connection) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
If we're raising back to the caller, this means we're not continuing with the rest of the with statement? Wondering whether things like socket.timeout's should be explicitly caught and returned back to the pool without reraising...