Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds max_consecutive_exceptions to pool #253

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Adds max_consecutive_exceptions to pool
  • Loading branch information
egalpin committed Apr 11, 2018
commit adef7d29313cea74acab79fc285b41e159b53caf
46 changes: 43 additions & 3 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@


import asyncio
from concurrent.futures._base import TimeoutError
import functools
import inspect
import time
@@ -15,6 +16,15 @@
from . import exceptions


BAD_CONN_EXCEPTION = (
exceptions._base.PostgresError,
exceptions._base.FatalPostgresError,
exceptions._base.UnknownPostgresError,
TimeoutError,
ConnectionRefusedError,
)


class PoolConnectionProxyMeta(type):

def __new__(mcls, name, bases, dct, *, wrap=False):
@@ -96,10 +106,12 @@ class PoolConnectionHolder:
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout')
'_inactive_callback', '_timeout',
'_max_consecutive_exceptions', '_consecutive_exceptions')

def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init, max_inactive_time):
max_queries, setup, init, max_inactive_time,
max_consecutive_exceptions):

self._pool = pool
self._con = None
@@ -108,6 +120,8 @@ def __init__(self, pool, *, connect_args, connect_kwargs,
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._max_consecutive_exceptions = max_consecutive_exceptions
self._consecutive_exceptions = 0
self._setup = setup
self._init = init
self._inactive_callback = None
@@ -259,6 +273,16 @@ def _deactivate_connection(self):
self._con.terminate()
self._con = None

async def maybe_close_bad_connection(self, exc_type):
if self._max_consecutive_exceptions > 0 and \
isinstance(exc_type, BAD_CONN_EXCEPTION):

self._consecutive_exceptions += 1

if self._consecutive_exceptions > self._max_consecutive_exceptions:
await self.close()
self._consecutive_exceptions = 0


class Pool:
"""A connection pool.
@@ -285,6 +309,7 @@ def __init__(self, *connect_args,
init,
loop,
connection_class,
max_consecutive_exceptions,
**connect_kwargs):

if loop is None:
@@ -331,6 +356,7 @@ def __init__(self, *connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
setup=setup,
init=init)

@@ -459,7 +485,8 @@ async def _acquire_impl():
ch = await self._queue.get() # type: PoolConnectionHolder
try:
proxy = await ch.acquire() # type: PoolConnectionProxy
except Exception:
except Exception as e:
await ch.maybe_close_bad_connection(e)
self._queue.put_nowait(ch)
raise
else:
@@ -580,6 +607,11 @@ async def __aexit__(self, *exc):
self.done = True
con = self.connection
self.connection = None
if not exc[0]:
con._holder._consecutive_exceptions = 0
else:
# Pass exception type to ConnectionHolder
await con._holder.maybe_close_bad_connection(exc[0])
await self.pool.release(con)

def __await__(self):
@@ -592,6 +624,7 @@ def create_pool(dsn=None, *,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
max_consecutive_exceptions=0,
setup=None,
init=None,
loop=None,
@@ -651,6 +684,12 @@ def create_pool(dsn=None, *,
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.

:param int max_consecutive_exceptions:
the maximum number of consecutive exceptions that may be raised by a
single connection before that connection is assumed corrupt (ex.
pointing to an old DB after a failover) and will therefore be closed.
Pass ``0`` to disable.

:param coroutine setup:
A coroutine to prepare a connection right before it is returned
from :meth:`Pool.acquire() <pool.Pool.acquire>`. An example use
@@ -699,4 +738,5 @@ def create_pool(dsn=None, *,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
**connect_kwargs)