Skip to content

Commit

Permalink
Merge "Add LIFO for connection pooling"
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzeek authored and Gerrit Code Review committed Sep 19, 2018
2 parents 5059412 + b64a3dd commit 5981e43
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 5 deletions.
28 changes: 28 additions & 0 deletions doc/build/changelog/migration_13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,34 @@ well as for casting decimal bind values for MySQL.

:ticket:`3981`

.. _change_pr467:

New last-in-first-out strategy for QueuePool
---------------------------------------------

The connection pool usually used by :func:`.create_engine` is known
as :class:`.QueuePool`. This pool uses an object equivalent to Python's
built-in ``Queue`` class in order to store database connections waiting
to be used. The ``Queue`` features first-in-first-out behavior, which is
intended to provide a round-robin use of the database connections that are
persistently in the pool. However, a potential downside of this is that
when the utilization of the pool is low, the re-use of each connection in series
means that a server-side timeout strategy that attempts to reduce unused
connections is prevented from shutting down these connections. To suit
this use case, a new flag :paramref:`.create_engine.pool_use_lifo` is added
which reverses the ``.get()`` method of the ``Queue`` to pull the connection
from the beginning of the queue instead of the end, essentially turning the
"queue" into a "stack" (adding a whole new pool called ``StackPool`` was
considered, however this was too much verbosity).

.. seealso::

:ref:`pool_use_lifo`





Key Behavioral Changes - Core
=============================

Expand Down
13 changes: 13 additions & 0 deletions doc/build/changelog/unreleased_13/pr467.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. change::
:tags: feature, engine

Added new "lifo" mode to :class:`.QueuePool`, typically enabled by setting
the flag :paramref:`.create_engine.pool_use_lifo` to True. "lifo" mode
means the same connection just checked in will be the first to be checked
out again, allowing excess connections to be cleaned up from the server
side during periods of the pool being only partially utilized. Pull request
courtesy Taem Park.

.. seealso::

:ref:`change_pr467`
35 changes: 35 additions & 0 deletions doc/build/core/pooling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,41 @@ a DBAPI connection might be invalidated include:
All invalidations which occur will invoke the :meth:`.PoolEvents.invalidate`
event.

.. _pool_use_lifo:

Using FIFO vs. LIFO
-------------------

The :class:`.QueuePool` class features a flag called
:paramref:`.QueuePool.use_lifo`, which can also be accessed from
:func:`.create_engine` via the flag :paramref:`.create_engine.pool_use_lifo`.
Setting this flag to ``True`` causes the pool's "queue" behavior to instead be
that of a "stack", e.g. the last connection to be returned to the pool is the
first one to be used on the next request. In contrast to the pool's long-
standing behavior of first-in-first-out, which produces a round-robin effect of
using each connection in the pool in series, lifo mode allows excess
connections to remain idle in the pool, allowing server-side timeout schemes to
close these connections out. The difference between FIFO and LIFO is
basically whether or not its desirable for the pool to keep a full set of
connections ready to go even during idle periods::

engine = create_engine(
"postgreql://", pool_use_lifo=True, pool_pre_ping=True)

Above, we also make use of the :paramref:`.create_engine.pool_pre_ping` flag
so that connections which are closed from the server side are gracefully
handled by the connection pool and replaced with a new connection.

Note that the flag only applies to :class:`.QueuePool` use.

.. versionadded:: 1.3

.. seealso::

:ref:`pool_disconnects`



Using Connection Pools with Multiprocessing
-------------------------------------------

Expand Down
15 changes: 15 additions & 0 deletions lib/sqlalchemy/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,21 @@ def create_engine(*args, **kwargs):
up on getting a connection from the pool. This is only used
with :class:`~sqlalchemy.pool.QueuePool`.
:param pool_use_lifo=False: use LIFO (last-in-first-out) when retrieving
connections from :class:`.QueuePool` instead of FIFO
(first-in-first-out). Using LIFO, a server-side timeout scheme can
reduce the number of connections used during non- peak periods of
use. When planning for server-side timeouts, ensure that a recycle or
pre-ping strategy is in use to gracefully handle stale connections.
.. versionadded:: 1.3
.. seealso::
:ref:`pool_use_lifo`
:ref:`pool_disconnects`
:param plugins: string list of plugin names to load. See
:class:`.CreateEnginePlugin` for background.
Expand Down
3 changes: 2 additions & 1 deletion lib/sqlalchemy/engine/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def connect(connection_record=None):
'events': 'pool_events',
'use_threadlocal': 'pool_threadlocal',
'reset_on_return': 'pool_reset_on_return',
'pre_ping': 'pool_pre_ping'}
'pre_ping': 'pool_pre_ping',
'use_lifo': 'pool_use_lifo'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
Expand Down
19 changes: 17 additions & 2 deletions lib/sqlalchemy/pool/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class QueuePool(Pool):
"""

def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, use_lifo=False,
**kw):
r"""
Construct a QueuePool.
Expand Down Expand Up @@ -63,14 +63,29 @@ def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
:param timeout: The number of seconds to wait before giving up
on returning a connection. Defaults to 30.
:param use_lifo: use LIFO (last-in-first-out) when retrieving
connections instead of FIFO (first-in-first-out). Using LIFO, a
server-side timeout scheme can reduce the number of connections used
during non-peak periods of use. When planning for server-side
timeouts, ensure that a recycle or pre-ping strategy is in use to
gracefully handle stale connections.
.. versionadded:: 1.3
.. seealso::
:ref:`pool_use_lifo`
:ref:`pool_disconnects`
:param \**kw: Other keyword arguments including
:paramref:`.Pool.recycle`, :paramref:`.Pool.echo`,
:paramref:`.Pool.reset_on_return` and others are passed to the
:class:`.Pool` constructor.
"""
Pool.__init__(self, creator, **kw)
self._pool = sqla_queue.Queue(pool_size)
self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo)
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
self._timeout = timeout
Expand Down
13 changes: 11 additions & 2 deletions lib/sqlalchemy/util/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ class Full(Exception):


class Queue:
def __init__(self, maxsize=0):
def __init__(self, maxsize=0, use_lifo=False):
"""Initialize a queue object with a given maximum size.
If `maxsize` is <= 0, the queue size is infinite.
If `use_lifo` is True, this Queue acts like a Stack (LIFO).
"""

self._init(maxsize)
Expand All @@ -57,6 +59,8 @@ def __init__(self, maxsize=0):
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# If this queue uses LIFO or FIFO
self.use_lifo = use_lifo

def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
Expand Down Expand Up @@ -196,4 +200,9 @@ def _put(self, item):

# Get an item from the queue
def _get(self):
return self.queue.popleft()
if self.use_lifo:
# LIFO
return self.queue.pop()
else:
# FIFO
return self.queue.popleft()
77 changes: 77 additions & 0 deletions test/engine/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,83 @@ def test_no_double_checkin(self):
rec.checkin
)

def test_lifo(self):
c1, c2, c3 = Mock(), Mock(), Mock()
connections = [c1, c2, c3]

def creator():
return connections.pop(0)

p = pool.QueuePool(creator, use_lifo=True)

pc1 = p.connect()
pc2 = p.connect()
pc3 = p.connect()

pc1.close()
pc2.close()
pc3.close()

for i in range(5):
pc1 = p.connect()
is_(pc1.connection, c3)
pc1.close()

pc1 = p.connect()
is_(pc1.connection, c3)

pc2 = p.connect()
is_(pc2.connection, c2)
pc2.close()

pc3 = p.connect()
is_(pc3.connection, c2)

pc2 = p.connect()
is_(pc2.connection, c1)

pc2.close()
pc3.close()
pc1.close()

def test_fifo(self):
c1, c2, c3 = Mock(), Mock(), Mock()
connections = [c1, c2, c3]

def creator():
return connections.pop(0)

p = pool.QueuePool(creator)

pc1 = p.connect()
pc2 = p.connect()
pc3 = p.connect()

pc1.close()
pc2.close()
pc3.close()

pc1 = p.connect()
is_(pc1.connection, c1)
pc1.close()

pc1 = p.connect()
is_(pc1.connection, c2)

pc2 = p.connect()
is_(pc2.connection, c3)
pc2.close()

pc3 = p.connect()
is_(pc3.connection, c1)

pc2 = p.connect()
is_(pc2.connection, c3)

pc2.close()
pc3.close()
pc1.close()


class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
Expand Down

0 comments on commit 5981e43

Please sign in to comment.