Skip to content

Commit

Permalink
Merge 6a5b188 into fdb8d92
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jun 11, 2021
2 parents fdb8d92 + 6a5b188 commit a7b8ce7
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 18 deletions.
7 changes: 6 additions & 1 deletion CHANGES.rst
Expand Up @@ -5,7 +5,12 @@
3.5.0a5 (unreleased)
====================

- Nothing changed yet.
- Fix ``RelStorage.zap_all()`` and ``zodbconvert --clear`` against
existing PostgreSQL databases with very large numbers of Blobs and
relatively small amounts of shared memory (e.g., default values for
``max_locks_per_transaction`` and ``max_connections``). Previously,
this could raise an ``out of shared memory`` error. See
:issue:`468`.


3.5.0a4 (2021-06-09)
Expand Down
64 changes: 57 additions & 7 deletions src/relstorage/adapters/postgresql/schema.py
Expand Up @@ -345,21 +345,71 @@ def _reset_oid(self, cursor):
# back if the surrounding transaction does not commit."
_zap_all_tbl_stmt = 'TRUNCATE TABLE %s CASCADE'

def _before_zap_all_tables(self, cursor, tables, slow=False):
super(PostgreSQLSchemaInstaller, self)._before_zap_all_tables(cursor, tables, slow)
def _before_zap_all_tables(self, conn, cursor, tables, slow=False):
super(PostgreSQLSchemaInstaller, self)._before_zap_all_tables(conn, cursor, tables, slow)
if not slow and 'blob_chunk' in tables:
# If we're going to be truncating, it's important to
# remove the large objects through lo_unlink. We have a
# trigger that does that, but only for DELETE.
# The `vacuumlo` command cleans up any that might have been
# missed.

# This unfortunately results in returning a row for each
# This unfortunately results in executing a statement for each
# object unlinked, but it should still be faster than
# running a DELETE and firing the trigger for each row.

# We need to take care to do this in chunks, though,
# so as to not allocate all the locks available on the server
# (which results in a confusing "out of shared memory" error).
# See https://github.com/zodb/relstorage/issues/468
# For that reason, even a simple DELETE which *does* fire the
# trigger isn't good enough.

# XXX: When we run on PostgreSQL 11 and above only, we can
# probably use a complete server-side ``DO`` loop, because
# on that version you can COMMIT in the loop.

# We first copy the blob_chunk info into the temporary
# table so that we can DELETE from it as we go and keep
# track of our chunks. Once we start unlinking the blobs,
# we can't delete from the blob_chunk table without
# breaking the existing trigger that wants to unlink. And
# since we're spanning transactions here, we don't want to
# disable the trigger.
cursor.execute("""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_zap_chunk(
chunk oid primary key
);
""")
cursor.execute("""
SELECT lo_unlink(t.chunk)
FROM
(SELECT DISTINCT chunk FROM blob_chunk)
AS t
INSERT INTO temp_zap_chunk
SELECT DISTINCT chunk
FROM blob_chunk;
""")
while True:
cursor.execute("""
DO $$BEGIN
PERFORM lo_unlink(chunk)
FROM temp_zap_chunk
ORDER BY chunk
LIMIT 1000;
DELETE FROM temp_zap_chunk
WHERE chunk IN (
SELECT chunk
FROM temp_zap_chunk
ORDER BY chunk
LIMIT 1000
);
END
$$ LANGUAGE plpgsql;
SELECT MIN(chunk) FROM temp_zap_chunk;
""")
cnt, = cursor.fetchone()
logger.info("Unlinked 1000 blob chunks. More? %s", bool(cnt))
self.driver.commit(conn)
if not cnt:
# Now we must truncate because the trigger won't let
# delete's happen.
cursor.execute('TRUNCATE TABLE blob_chunk;')
break
10 changes: 5 additions & 5 deletions src/relstorage/adapters/schema.py
Expand Up @@ -826,10 +826,10 @@ def zap_all(self, reset_oid=True, slow=False):
"""
stmt = self._zap_all_tbl_stmt if not slow else self._slow_zap_all_tbl_stmt

def zap_all(_conn, cursor):
def zap_all(conn, cursor):
existent = set(self.list_tables(cursor))
to_zap = {} # {normalized_name: recorded_name}
self._before_zap_all_tables(cursor, existent, slow)
self._before_zap_all_tables(conn, cursor, existent, slow)
for possible_table in existent:
norm_table = self._normalize_schema_object_names([possible_table])[0]
if norm_table.startswith('temp_'):
Expand Down Expand Up @@ -858,9 +858,9 @@ def zap_all(_conn, cursor):

# Hooks for subclasses

def _before_zap_all_tables(self, cursor, tables, slow=False):
logger.debug("Before zapping existing tables (%s) with %s; slow: %s",
tables, cursor, slow)
def _before_zap_all_tables(self, conn, cursor, tables, slow=False):
logger.debug("Before zapping existing tables (%s) with %s/%s; slow: %s",
tables, conn, cursor, slow)

def _after_zap_all_tables(self, cursor, slow=False):
logger.debug("Running init script. Slow: %s", slow)
Expand Down
76 changes: 71 additions & 5 deletions src/relstorage/tests/testpostgresql.py
Expand Up @@ -66,10 +66,11 @@ def get_adapter_zconfig(self):
def verify_adapter_from_zconfig(self, adapter):
self.assertEqual(adapter._dsn, self.__get_adapter_zconfig_dsn())

class TestBlobMerge(PostgreSQLAdapterMixin,
StorageCreatingMixin,
TestCase,
StorageTestBase.StorageTestBase):
class TestBlobFunctionality(
PostgreSQLAdapterMixin,
StorageCreatingMixin,
TestCase,
StorageTestBase.StorageTestBase):
# pylint:disable=too-many-ancestors

def test_merge_blobs_on_open(self):
Expand Down Expand Up @@ -134,6 +135,71 @@ def test_merge_blobs_on_open(self):
conn.close()
db.close()

REALLY_EXHAUST_SHARED_MEMORY = False

def test_zapping_with_many_blobs(self):
# https://github.com/zodb/relstorage/issues/468
# If a database has many blobs (more than 4600 by default)
# it couldn't be zapped.

from ZODB.DB import DB
from ZODB.blob import Blob
import transaction
storage = self._closing(self.make_storage(
blob_dir='blobs', shared_blob_dir=False))
db = self._closing(DB(storage))
conn = db.open()

if self.REALLY_EXHAUST_SHARED_MEMORY: # pragma: no cover
# NOTE: When actually testing the shared memory exhaustion,
# this test is slow; it takes about 45s with default
# server settings and psycopg2, and 1:50 under pg8000

# First, figure out how many blobs we need to create to exceed
# the limit and fail: max_locks_per_transaction * max_connections
cursor = conn._storage._load_connection.cursor
cursor.execute("SELECT CURRENT_SETTING('max_locks_per_transaction')")
max_locks = cursor.fetchall()[0][0]
cursor.execute("SELECT CURRENT_SETTING('max_connections')")
max_conn = cursor.fetchall()[0][0]

# max_locks * max_conn is the documented limit of the locks,
# but it seems to actually be memory based. For example, with
# max_locks = 64 (the default) and max_conn = 300 (3x the default)
# we calculate a max_blobs of 19,200. And the server easily handles that.
# However, dropping down to max_conn = 100 (the default), the server
# fails to zap the 19,200 blobs, though it does zap the 6,400 blobs fine.
# Hence the final * 3
max_blobs = int(max_locks) * int(max_conn) * 3
else:
# Choose a number to let us loop a few times.
max_blobs = 3523


blobs = []
for i in range(max_blobs):

blob = Blob()
with blob.open('w') as f:
data = str(i)
if not isinstance(data, bytes):
data = data.encode('ascii')
f.write(data)

blobs.append(blob)

conn.root().blobs = blobs
transaction.commit()

conn.close()
# Now zop, being sure to use the fast method that originally trigged
# this bug.
storage.zap_all(slow=False)
cursor = storage._load_connection.cursor
cursor.execute('SELECT COUNT(*) FROM blob_chunk')
self.assertEqual(0, cursor.fetchone()[0])


class TestGenerateTIDPG(PostgreSQLAdapterMixin,
StorageCreatingMixin,
TestCase,
Expand Down Expand Up @@ -220,7 +286,7 @@ def __init__(self):
super(PostgreSQLTestSuiteBuilder, self).__init__(
drivers,
PostgreSQLAdapterMixin,
extra_test_classes=(TestBlobMerge, TestGenerateTIDPG)
extra_test_classes=(TestBlobFunctionality, TestGenerateTIDPG)
)

def _compute_large_blob_size(self, use_small_blobs):
Expand Down

0 comments on commit a7b8ce7

Please sign in to comment.