Skip to content

Commit

Permalink
Merge pull request #428 from zodb/issue424
Browse files Browse the repository at this point in the history
Improve the efficiency of packing databases
  • Loading branch information
jamadden committed Oct 6, 2020
2 parents 2d8a1f4 + 5089c77 commit 16199d6
Show file tree
Hide file tree
Showing 21 changed files with 738 additions and 279 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Expand Up @@ -15,6 +15,8 @@
(Sadly this just squashes the warning, it doesn't eliminate the
round trip that generates it.)

- Improve the performance of packing databases, especially
history-free databases. See :issue:`275`.

3.3.2 (2020-09-21)
==================
Expand Down
19 changes: 19 additions & 0 deletions docs/postgresql/setup.rst
Expand Up @@ -36,3 +36,22 @@ PostgreSQL re-reads ``pg_hba.conf`` when you ask it to reload its
configuration file::

/etc/init.d/postgresql reload

Configuration
=============

.. tip::

For packing large databases, a larger value of the PostgreSQL
configuration paramater ``work_mem`` is likely to yield improved
performance. The default is 4MB; try 16MB if packing performance is
unacceptable.

.. tip::

For packing large databases, setting the ``pack_object``,
``object_ref`` and ``object_refs_added`` tables to `UNLOGGED
<https://www.postgresql.org/docs/12/sql-createtable.html#SQL-CREATETABLE-UNLOGGED>`_
can provide a performance boost (if replication doesn't matter and
you don't care about the contents of these tables). This can be
done after the schema is created with ``ALTER TABLE table SET UNLOGGED``.
6 changes: 6 additions & 0 deletions docs/sqlite3/faq.rst
Expand Up @@ -81,6 +81,12 @@ parallel commits?
database is locked as close to the end of the commit process as
possible.

Note that packing a SQLite database makes no effort to reduce the
amount of time spent writing to the database. It's unlikely you'll
get meaningful parallel writes to happen while packing the
database. If you plan to deploy SQLite databases to production,
also plan to schedule downtime to pack them.

Q: Should I disable RelStorage's cache when used with SQLite?

A: Possibly (with ``cache-local-mb 0``). Let the operating system
Expand Down
27 changes: 27 additions & 0 deletions docs/things-to-know.rst
Expand Up @@ -139,3 +139,30 @@ maintain a registry of open databases. Here's an example using
It might also be necessary to register a signal handler to perform the
same operation in the event of unclean shutdowns. See :issue:`183` for
more discussion.

.. _to-know-about-pack:

Packing A Database May Increase Disk Usage
==========================================

After packing a RelStorage, whether history-free or history-preserving,
whether through the ZODB APIs or the command line tool
:doc:`zodbpack`, you may find that the database disk usage has
actually increased, sometimes by a substantial fraction of the main
database size.

RelStorage deliberately stores information in the database about the
object references it discovered during packing. The next time a pack
is run, this information is used for objects that haven't changed,
making it unnecessary for RelStorage to discover it again. In turn,
this makes subsequent packs substantially faster when there are many
objects that haven't changed (which is typically the case for many
applications.)

A future version of RelStorage might provide the option to remove this
data when a pack finishes.

You can also use the ``multi-zodb-gc`` script provided by the
``zc.zodbdgc`` project to pack a RelStorage. It does not store this
persistent data, but it may be substantially slower than the native
packing capabilities, especially on large databases.
2 changes: 2 additions & 0 deletions docs/zodbpack.rst
Expand Up @@ -16,6 +16,8 @@ the storages to pack, in ZConfig format. An example configuration file::
</mysql>
</relstorage>

When packing a RelStorage, please read :ref:`to-know-about-pack`.

Options for ``zodbpack``
========================

Expand Down
40 changes: 36 additions & 4 deletions src/relstorage/_util.py
Expand Up @@ -72,7 +72,7 @@
'parse_boolean',
'parse_byte_size',
'positive_integer',
'get_time_from_environ',
'get_duration_from_environ',
]

positive_integer = RangeCheckedConversion(integer, min=1)
Expand Down Expand Up @@ -144,12 +144,44 @@ def __exit__(self, t, v, tb):
self.__end = self.counter()
self.duration = self.__end - self.__begin

def get_time_from_environ(environ_name, default):
return _setting_from_environ(float, environ_name, default)
def get_duration_from_environ(environ_name, default):
"""
Return a floating-point number of seconds from the environment *environ_name*,
or *default*.
Examples: ``1.24s``, ``3m``, ``1m 3.6s``::
>>> import os
>>> os.environ['RS_TEST_VAL'] = '2.3'
>>> get_duration_from_environ('RS_TEST_VAL', None)
2.3
>>> os.environ['RS_TEST_VAL'] = '5.4s'
>>> get_duration_from_environ('RS_TEST_VAL', None)
5.4
>>> os.environ['RS_TEST_VAL'] = '1m 3.2s'
>>> get_duration_from_environ('RS_TEST_VAL', None)
63.2
>>> os.environ['RS_TEST_VAL'] = 'Invalid' # No time specifier
>>> get_duration_from_environ('RS_TEST_VAL', 42)
42
>>> os.environ['RS_TEST_VAL'] = 'Invalids' # The 's' time specifier
>>> get_duration_from_environ('RS_TEST_VAL', 42)
42
"""

def convert(val):
# The default time-interval accepts only integers; that's not fine
# grained enough for these durations.
if any(c in val for c in ' wdhms'):
delta = stock_datatypes['timedelta'](val)
return delta.total_seconds()
return float(val)

return _setting_from_environ(convert, environ_name, default)

def _get_log_time_level(level_int, default):
level_name = logging.getLevelName(level_int)
val = get_time_from_environ('RS_PERF_LOG_%s_MIN' % level_name, default)
val = get_duration_from_environ('RS_PERF_LOG_%s_MIN' % level_name, default)
return (level_int, float(val))

# A list of tuples (level_int, min_duration), ordered by increasing
Expand Down
99 changes: 84 additions & 15 deletions src/relstorage/adapters/batch.py
Expand Up @@ -19,6 +19,7 @@
from relstorage._compat import iteritems
from relstorage._compat import perf_counter
from relstorage._util import parse_byte_size
from relstorage._util import consume

from .interfaces import AggregateOperationTimeoutError

Expand Down Expand Up @@ -93,34 +94,45 @@ def __repr__(self):
)

def _flush_if_needed(self):
"""
Return the number of rows updated.
"""
if self.rows_added >= self.row_limit:
return self.flush()
if self.bind_limit and self.bind_params_added >= self.bind_limit:
return self.flush()
if self.size_added >= self.size_limit:
return self.flush()
return 0

def _flush_if_would_exceed_bind(self, addition):
# The bind limit is a hard limit we cannot exceed.
# If adding *addition* params would cause us to exceed,
# flush now.
if self.bind_limit and self.bind_params_added + addition >= self.bind_limit:
self.flush()
return True
return self.flush() or True # This should always be at least one, right?
return 0

def delete_from(self, table, **kw):
"""
Returns the number of rows flushed as a result of this operation.
That can include inserts.
"""
# XXX: When deleting a lot from a single table, a bulk function
# might be a lot faster.
if not kw:
raise AssertionError("Need at least one column value")
columns = tuple(sorted(kw))
key = (table, columns)
row = tuple(kw[column] for column in columns)
bind_params_added = len(row) if key not in self.deletes[key] else 0
self._flush_if_would_exceed_bind(bind_params_added)
count = self._flush_if_would_exceed_bind(bind_params_added)

self.deletes[key].add(row)
self.rows_added += 1
self.bind_params_added += bind_params_added
self._flush_if_needed()
count += self._flush_if_needed()
return count

def insert_into(self, header, row_schema, row, rowkey, size,
command='INSERT', suffix=''):
Expand Down Expand Up @@ -177,45 +189,97 @@ def select_from(self, columns, table, suffix='', timeout=None, **kw):
some subset of batches exceeds *timeout*. This is checked
after each individual batch.
"""
command = 'SELECT %s' % (','.join(columns),)

for cursor in self.__select_like(
command,
table,
suffix,
timeout,
kw
):
for row in cursor.fetchall():
yield row

def update_set_static(self, update_set, timeout=None, **kw):
"""
As for :meth:`select_from`, but the first parameter is
the complete static UPDATE statement. It must be uppercase,
and startwith "UPDATE".
Rows are net expected to be returned, so the cursor is completely consumed between
batches.
"""
# We actually just consume the iteration of the cursor itself;
# we can't consume the cursor. Some drivers (pg8000) throw ProgrammingError
# if we try to iterate the cursor that has no rows.
consume(
self.__select_like(
update_set,
None, # table not used
'', # suffix not used,
timeout,
kw
))

return self.__last_select_like_count

__last_select_like_count = 0

def __select_like(self, command, table, suffix, timeout, kw):
assert len(kw) == 1
# filter_values may be a generic iterable or even a generator;
# be sure not to materialize the whole thing at any point. Never
# more than a chunk_size at a time.
filter_column, filter_values = kw.popitem()
filter_values = iter(filter_values)

command = 'SELECT %s' % (','.join(columns),)

chunk_size = self.bind_limit or self.row_limit
chunk_size -= 1

begin = self.perf_counter() if timeout else None

count = 0
for head in filter_values:
filter_subset = list(itertools.islice(filter_values, chunk_size))
filter_subset.append(head)

descriptor = [[(table, (filter_column,)), filter_subset]]

self._do_batch(command, descriptor, rows_need_flattened=False, suffix=suffix)
count += self._do_batch(command, descriptor, rows_need_flattened=False, suffix=suffix)

for row in self.cursor.fetchall():
yield row
yield self.cursor

if timeout and self.perf_counter() - begin >= timeout:
# TODO: It'd be nice not to do this if we had no more
# batches to do.
raise AggregateOperationTimeoutError

self.__last_select_like_count = count

def flush(self):
"""
Return the tetal number of rows deleted or inserted in this operation.
(This is the number requested, in the case of deletes, not the number
that actually matched.)
This can be treated as a boolean to discover if anything was flushed.
"""
count = 0
if self.deletes:
self.total_rows_deleted += self._do_deletes()
count += self._do_deletes()
self.total_rows_deleted += count
self.deletes.clear()
if self.inserts:
self.total_rows_inserted += self._do_inserts()
count += self._do_inserts()
self.total_rows_inserted += count
self.inserts.clear()
self.total_size_inserted += self.size_added

self.rows_added = 0
self.size_added = 0
self.bind_params_added = 0
return count

def _do_deletes(self):
return self._do_batch('DELETE', sorted(iteritems(self.deletes)))
Expand Down Expand Up @@ -263,9 +327,14 @@ def _make_single_column_query(self, command, table,
filter_column, filter_value,
rows_need_flattened):
placeholder_str = self._make_placeholder_list_of_length(len(filter_value))
stmt = "%s FROM %s WHERE %s IN (%s)" % (
command, table, filter_column, placeholder_str
)
if not command.startswith('UPDATE'):
stmt = "%s FROM %s WHERE %s IN (%s)" % (
command, table, filter_column, placeholder_str
)
else:
stmt = "%s WHERE %s IN (%s)" % (
command, filter_column, placeholder_str
)
return stmt, filter_value, rows_need_flattened

def _do_inserts(self):
Expand All @@ -287,7 +356,7 @@ def _do_inserts(self):
params = list(itertools.chain.from_iterable(rows))

stmt = "%s INTO %s VALUES\n%s\n%s" % (
command, header, ',\n'.join(values_template), suffix)
command, header, ', '.join(values_template), suffix)
# e.g.,
# INSERT INTO table(c1, c2)
# VALUES (%s, %s), (%s, %s), (%s, %s)
Expand Down
9 changes: 9 additions & 0 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -41,6 +41,15 @@ class IDBDialect(Interface):

# TODO: Fill this in.

def boolean_str(value):
"""
Given exactly a `bool` (`True` or `False`) return the string the database
uses to represent that literal.
By default, this will be "TRUE" or "FALSE", but older versions of SQLite
need 1 or 0, while Oracle needs "'Y'" or "'N'".
"""

class IDBDriver(Interface):
"""
An abstraction over the information needed for RelStorage to work
Expand Down
8 changes: 4 additions & 4 deletions src/relstorage/adapters/oracle/dialect.py
Expand Up @@ -21,10 +21,6 @@

class OracleCompiler(Compiler):

def visit_boolean_literal_expression(self, value):
sql = "'Y'" if value else "'N'"
self.emit(sql)

def can_prepare(self):
# We haven't investigated preparing statements manually
# with cx_Oracle. There's a chance that `cx_Oracle.Connection.stmtcachesize`
Expand Down Expand Up @@ -117,3 +113,7 @@ def extra_constraints_for_column(self, column):

def compiler_class(self):
return OracleCompiler

def boolean_str(self, value):
sql = "'Y'" if value else "'N'"
return sql

0 comments on commit 16199d6

Please sign in to comment.