Skip to content

Commit

Permalink
Merge pull request #452 from zodb/better_store_conn_cleanup
Browse files Browse the repository at this point in the history
Improve the way store connections are managed
  • Loading branch information
jamadden committed Apr 12, 2021
2 parents a8f517a + 825c0aa commit 5e6ef78
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .github/actions/config-cc/action.yml
Expand Up @@ -3,10 +3,11 @@ description: Configure ccache and CFLAGS
runs:
using: "composite"
steps:
- name: Install ccache, configure CFLAGS
- name: Install ccache, configure CFLAGS, update brew
shell: bash
run: |
if [ "`uname`" = "Darwin" ]; then
brew update
brew install ccache
echo CFLAGS=$CFLAGS -Wno-parentheses-equality -Wno-constant-logical-operand >> $GITHUB_ENV
echo CXXFLAGS=$CXXFLAGS -Wno-parentheses-equality -Wno-constant-logical-operand >> $GITHUB_ENV
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -11,6 +11,9 @@
- RelStorage is now tested with PostgreSQL 13.1. See :issue:`427`.
- RelStorage is now tested with PyMySQL 1.0. See :issue:`434`.
- Update the bundled boost C++ library from 1.71 to 1.75.
- Improve the way store connections are managed to make it less likely
a "stale" store connection that hasn't actually been checked for
liveness gets used.

3.4.0 (2020-10-19)
==================
Expand Down
21 changes: 18 additions & 3 deletions src/relstorage/adapters/connections.py
Expand Up @@ -357,8 +357,9 @@ class StoreConnectionPool(object):

def __init__(self, connmanager):
import threading
self._lock = threading.Lock()
self._lock = threading.Lock() # Not RLock, cheaper that way
self._connmanager = connmanager
# LIFO of connections: Next to use is at the right end.
self._connections = []
self._count = 1
self._factory = StoreConnection
Expand Down Expand Up @@ -442,7 +443,10 @@ def _replace(self, connection, needs_rollback):
else:
connection.exit_critical_phase()
with self._lock:
self._connections.append(connection)
if connection.connection is not None:
# If it's been dropped, don't add it; better just start
# fresh.
self._connections.append(connection)
self._shrink()

def _shrink(self):
Expand All @@ -454,7 +458,17 @@ def _shrink(self):
keep_connections = min(self._count, self.MAX_STORE_CONNECTIONS_IN_POOL or self._count)

while len(self._connections) > keep_connections and self._connections:
conn = self._connections.pop()
# Pop off the oldest connection to eliminate.
#
# Because of the MVCC protocol, we will have an instance
# of this class for the "root" object associated with the
# ZODB.DB, that is usually never actually used in a
# transaction, plus one for every extent ZODB
# Connection/RelStorage instance. That first index might be a very old
# connection indeed, and could continue to exist even if all
# ZODB Connection objects have been closed. To mitigate the risk of that
# causing a problem, remove starting with the oldest.
conn = self._connections.pop(0)
conn.drop() # TODO: This could potentially be slow? Might
# want to do this outside the lock.
conn.connmanager = None # It can't be opened again.
Expand All @@ -478,6 +492,7 @@ def pooled_connection_count(self):
def instance_count(self):
return self._count


class ClosedConnectionPool(object):

def borrow(self):
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/oracle/scriptrunner.py
Expand Up @@ -175,7 +175,7 @@ def run_lob_stmt(self, cursor, stmt, args=(), default=None):
try:
cursor.execute(stmt + ' ', args)
rows = [
tuple([self._read_lob(x) for x in row])
tuple(self._read_lob(x) for x in row)
for row in cursor
]
finally:
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/postgresql/drivers/psycopg2.py
Expand Up @@ -94,7 +94,7 @@ class RSPsycopg2Connection(mod.extensions.connection):
def _get_extension_module(self):
# Subclasses should override this method if they use a different
# DB-API module.
from psycopg2 import extensions # pylint:disable=no-name-in-module
from psycopg2 import extensions # pylint:disable=no-name-in-module,import-error
return extensions

_WANT_WAIT_CALLBACK = False
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/postgresql/drivers/psycopg2cffi.py
Expand Up @@ -63,7 +63,7 @@ def __init__(self, dsn, **kwargs):
return mod.RSPsycopg2cffiConnection

def _get_extension_module(self):
from psycopg2cffi import extensions # pylint:disable=no-name-in-module
from psycopg2cffi import extensions # pylint:disable=no-name-in-module,import-error
return extensions

# as of psycopg2cffi 2.8.1 connection has no '.info' attribute.
Expand Down
30 changes: 30 additions & 0 deletions src/relstorage/adapters/tests/test_connections.py
Expand Up @@ -173,3 +173,33 @@ def factory(_):
raise MyBeginException

check(factory, MyBeginException)

def test_shrinking_removes_at_beginning(self):
connmanager = MockConnectionManager(clean_rollback=True)
connmanager.begin = lambda *_args: None

pool = StoreConnectionPool(connmanager)

borrowed = [pool.new_instance().borrow() for _ in range(10)]

for c in borrowed:
pool.replace(c)

self.assertEqual(borrowed, pool._connections)

pool.MAX_STORE_CONNECTIONS_IN_POOL = 5
remaining = pool._connections[5:]
pool._shrink()
self.assertEqual(remaining, pool._connections)

def test_returning_dropped_does_not_add_to_pool(self):
connmanager = MockConnectionManager(clean_rollback=True)
connmanager.begin = lambda *_args: None

pool = StoreConnectionPool(connmanager)
conn = pool.borrow()
assert conn.connection is not None
conn.drop()
assert conn.connection is None
pool.replace(conn)
self.assertEqual(0, pool.pooled_connection_count)
4 changes: 2 additions & 2 deletions src/relstorage/storage/tpc/__init__.py
Expand Up @@ -126,8 +126,8 @@ def store_connection(self, storage, store_connection, force):
store_connection,
self.prepared_txn)

if force:
store_connection.drop()
if force:
store_connection.drop()
finally:
storage._store_connection_pool.replace(store_connection)
return _CLOSED_CONNECTION
Expand Down
6 changes: 5 additions & 1 deletion src/relstorage/tests/__init__.py
Expand Up @@ -296,14 +296,18 @@ def __setattr__(self, name, value):
class MockConnectionManager(object):

isolation_load = 'SERIALIZABLE'
clean_rollback = None

def __init__(self, driver=None):
def __init__(self, driver=None, clean_rollback=None):
if driver is None:
self.driver = MockDriver()
if clean_rollback is not None:
self.clean_rollback = clean_rollback

def rollback_quietly(self, conn, cursor): # pylint:disable=unused-argument
if hasattr(conn, 'rollback'):
conn.rollback()
return self.clean_rollback

rollback_store_quietly = rollback_quietly

Expand Down

0 comments on commit 5e6ef78

Please sign in to comment.