Skip to content

Commit

Permalink
Merge pull request #467 from zodb/bigger_MVCC_depth_by_default
Browse files Browse the repository at this point in the history
Increase the default RS_CACHE_MVCC_MAX_DEPTH.
  • Loading branch information
jamadden committed May 12, 2021
2 parents 575d61f + 74a5910 commit 620efd5
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 66 deletions.
6 changes: 4 additions & 2 deletions .github/actions/install-native-libs/action.yml
Expand Up @@ -13,8 +13,10 @@ runs:
# into /usr/local/lib as of 1.1
brew install mysql
# homebrew tends to get a current-ish version of PostgreSQL. At this writing,
# Jan 2021, it has PostgreSQL 13.1.
brew install postgresql
# Jan 2021, it has PostgreSQL 13.1; as-of Apr 28 2021 it has
# 13.2_2 and thinks that postgresql 13.2_1 is already
# installed; it won't upgrade automatically
brew install postgresql || brew upgrade postgresql
brew install libmemcached
brew install openssl
echo LDFLAGS=$LDFLAGS -L/usr/local/opt/openssl@1.1/lib >> $GITHUB_ENV
Expand Down
6 changes: 5 additions & 1 deletion .pylintrc
Expand Up @@ -35,6 +35,9 @@
# import-outside-toplevel: New in pylint 2.4. We have a large number of deferred imports.
# super-with-arguments and raise-missing-from: New in pylint 2.7; we can't use them because
# we support Python 2.
# consider-using-with: New in pylint 2.8, catches things like ``return open(path)``, which
# obviously can't use a ``with`` statement. Disabled as 90% of hits (~20) were a false positive,
# and a few didn't apply to Python 2.
disable=
invalid-name,
missing-docstring,
Expand All @@ -58,7 +61,8 @@ disable=
import-outside-toplevel,
relative-beyond-top-level,
super-with-arguments,
raise-missing-from
raise-missing-from,
consider-using-with
# undefined-all-variable


Expand Down
18 changes: 16 additions & 2 deletions CHANGES.rst
Expand Up @@ -2,10 +2,24 @@
Changes
=========

3.4.6 (unreleased)
3.5.0 (unreleased)
==================

- Nothing changed yet.
- Increase the default value of the ``RS_CACHE_MVCC_MAX_DEPTH``
advanced tuning parameter from 100 to 1000 based on observations of
production workloads. (Connections that haven't polled
for the last ``RS_CACHE_MVCC_MAX_DEPTH`` committed transactions ---
and thus are expected to have a large number of invalidations ---
are "detached" and forced to invalidate their entire persistent
object cache if they get used again.)

- Add StatsD counter metric
"relstorage.cache.mvcc.invalidate_all_detached" that is incremented
when a previously-detached Connection is required to invalidate its
entire persistent object cache. In a well-tuned environment, this
counter should be very low and as such is not sampled but always sent.

- Fix the logging of some environment variables RelStorage uses.


3.4.5 (2021-04-23)
Expand Down
70 changes: 40 additions & 30 deletions src/relstorage/_util.py
Expand Up @@ -51,40 +51,46 @@
from relstorage._compat import perf_counter
from relstorage._compat import IN_TESTRUNNER

logger = logging.getLogger(__name__)
perf_logger = logger.getChild('timing')
_logger = logging.getLogger('relstorage')
perf_logger = _logger.getChild('timing')

int64_to_8bytes = p64
bytes8_to_int64 = u64

__all__ = [
'int64_to_8bytes',
'bytes8_to_int64',
'timestamp_at_unixtime',
'timer',
'log_timed',
'log_timed_only_self',
'thread_spawn',
'spawn',
'get_memory_usage',
'byte_display',
'Lazy',

'CachedIn',
'to_utf8',
'consume',
'Lazy',
'TRACE',
'parse_boolean',
'parse_byte_size',
'positive_integer',
'byte_display',
'consume',

'get_duration_from_environ',
'get_positive_integer_from_environ',
'get_non_negative_float_from_environ',
'get_boolean_from_environ',

'get_memory_usage',
'log_timed',
'log_timed_only_self',
'metricmethod',
'metricmethod_sampled',
'parse_boolean',
'parse_byte_size',
'positive_integer',
'spawn',
'thread_spawn',
'timer',
'timestamp_at_unixtime',
'to_utf8',
]

positive_integer = RangeCheckedConversion(integer, min=1)
non_negative_float = RangeCheckedConversion(float, min=0)

def _setting_from_environ(converter, environ_name, default):
def _setting_from_environ(converter, environ_name, default, logger):
result = default
env_val = os.environ.get(environ_name, default)
if env_val is not default:
Expand All @@ -100,11 +106,11 @@ def _setting_from_environ(converter, environ_name, default):
return result


def get_positive_integer_from_environ(environ_name, default):
return _setting_from_environ(positive_integer, environ_name, default)
def get_positive_integer_from_environ(environ_name, default, logger=_logger):
return _setting_from_environ(positive_integer, environ_name, default, logger)

def get_non_negative_float_from_environ(environ_name, default):
return _setting_from_environ(non_negative_float, environ_name, default)
def get_non_negative_float_from_environ(environ_name, default, logger=_logger):
return _setting_from_environ(non_negative_float, environ_name, default, logger)

def parse_boolean(val):
if val == '0':
Expand All @@ -115,6 +121,9 @@ def parse_boolean(val):

parse_byte_size = stock_datatypes['byte-size']

def get_boolean_from_environ(environ_name, default, logger=_logger):
return _setting_from_environ(parse_boolean, environ_name, default, logger)

def timestamp_at_unixtime(now):
"""
Return a :class:`persistent.timestamp.TimeStamp` for the moment
Expand Down Expand Up @@ -154,7 +163,7 @@ def __exit__(self, t, v, tb):
self.__end = self.counter()
self.duration = self.__end - self.__begin

def get_duration_from_environ(environ_name, default):
def get_duration_from_environ(environ_name, default, logger=_logger):
"""
Return a floating-point number of seconds from the environment *environ_name*,
or *default*.
Expand Down Expand Up @@ -187,11 +196,11 @@ def convert(val):
return delta.total_seconds()
return float(val)

return _setting_from_environ(convert, environ_name, default)
return _setting_from_environ(convert, environ_name, default, logger)

def _get_log_time_level(level_int, default):
level_name = logging.getLevelName(level_int)
val = get_duration_from_environ('RS_PERF_LOG_%s_MIN' % level_name, default)
val = get_duration_from_environ('RS_PERF_LOG_%s_MIN' % level_name, default, logger=perf_logger)
return (level_int, float(val))

# A list of tuples (level_int, min_duration), ordered by increasing
Expand All @@ -215,16 +224,16 @@ def _get_log_time_level(level_int, default):
# The 'log_details_threshold' property of the function can be
# assigned to make it different than the default.
_LOG_TIMED_DEFAULT_DETAILS_THRESHOLD = logging.getLevelName(
_setting_from_environ(str, 'RS_PERF_LOG_DETAILS_LEVEL', 'WARN')
_setting_from_environ(str, 'RS_PERF_LOG_DETAILS_LEVEL', 'WARN', logger=perf_logger)
)


# If this is true when a module is imported, timer decorations
# are omitted.
_LOG_TIMED_COMPILETIME_ENABLE = _setting_from_environ(
parse_boolean,
_LOG_TIMED_COMPILETIME_ENABLE = get_boolean_from_environ(
'RS_PERF_LOG_ENABLE',
'on'
'on',
logger=perf_logger,
)

def do_log_duration_info(basic_msg, func,
Expand Down Expand Up @@ -311,7 +320,8 @@ def log_timed_only_self(func):

_ThreadWithReady = None

METRIC_SAMPLE_RATE = get_non_negative_float_from_environ('RS_PERF_STATSD_SAMPLE_RATE', 0.1)
METRIC_SAMPLE_RATE = get_non_negative_float_from_environ('RS_PERF_STATSD_SAMPLE_RATE', 0.1,
logger=perf_logger)

metricmethod_sampled = Metric(method=True, rate=METRIC_SAMPLE_RATE)

Expand Down Expand Up @@ -379,7 +389,7 @@ def spawn(func, args=()):
if gevent.monkey.is_module_patched('threading'):
submit = _gevent_pool_spawn

logger.debug("Using %s to run %s", submit, func)
_logger.log(TRACE, "Using %s to run %s", submit, func)
return submit(func, args)

def get_this_psutil_process():
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/connections.py
Expand Up @@ -368,7 +368,8 @@ class StoreConnectionPool(object):

MAX_STORE_CONNECTIONS_IN_POOL = get_positive_integer_from_environ(
'RS_MAX_POOLED_STORE_CONNECTIONS',
None
None,
logger=logger
)

def __init__(self, connmanager):
Expand Down
8 changes: 4 additions & 4 deletions src/relstorage/adapters/drivers.py
Expand Up @@ -19,15 +19,14 @@

import importlib
import sys
import os

from zope.interface import directlyProvides
from zope.interface import implementer

from .._compat import PYPY
from .._compat import PY3
from .._compat import casefold
from .._util import positive_integer
from .._util import get_positive_integer_from_environ
from .._util import consume

from .interfaces import IDBDriver
Expand Down Expand Up @@ -128,8 +127,9 @@ class AbstractModuleDriver(object):
#: DB-API extension. We default to 1024, but the environment variable
#: RS_CURSOR_ARRAYSIZE can be set to an int to change this default.
#: Individual drivers *might* choose a different default.
cursor_arraysize = positive_integer(
os.environ.get('RS_CURSOR_ARRAYSIZE', '1024')
cursor_arraysize = get_positive_integer_from_environ(
'RS_CURSOR_ARRAYSIZE', 1024,
logger=logger,
)

DriverNotAvailableError = DriverNotAvailableError
Expand Down
15 changes: 10 additions & 5 deletions src/relstorage/adapters/packundo.py
Expand Up @@ -66,22 +66,26 @@ class PackUndo(DatabaseHelpersMixin):
# implementations.
###
cursor_arraysize = get_positive_integer_from_environ('RS_PACK_CURSOR_ARRAYSIZE',
4096)
4096,
logger=logger)

# These are generally very small rows we're uploading (a few integers).
# Be generous.
store_batch_size = get_positive_integer_from_environ('RS_PACK_STORE_BATCH_SIZE', 4096)
store_batch_size = get_positive_integer_from_environ('RS_PACK_STORE_BATCH_SIZE', 4096,
logger=logger)

# How often, in seconds, to commit work in progress.
# This is a variable here for testing.
fill_object_refs_commit_frequency = get_duration_from_environ('RS_PACK_COMMIT_FREQUENCY', 120)
fill_object_refs_commit_frequency = get_duration_from_environ('RS_PACK_COMMIT_FREQUENCY', 120,
logger=logger)

# How many object states to find references in at any one time.
# This is a control on the amount of memory used by the Python
# process during packing, especially if the database driver
# doesn't use server-side cursors.
fill_object_refs_batch_size = get_positive_integer_from_environ('RS_PACK_DOWNLOAD_BATCH_SIZE',
1024)
1024,
logger=logger)

def __init__(self, database_driver, connmanager, runner, locker, options):
self.driver = database_driver
Expand Down Expand Up @@ -459,7 +463,8 @@ class HistoryPreservingPackUndo(PackUndo):

delete_empty_transactions_batch_size = get_positive_integer_from_environ(
'RS_PACK_HP_DELETE_BATCH_SIZE',
1000
1000,
logger=logger
)

_delete_empty_transactions_batch_query = Schema.transaction.delete(
Expand Down
15 changes: 6 additions & 9 deletions src/relstorage/adapters/tests/test_replica.py
Expand Up @@ -72,9 +72,8 @@ def test_current(self):
rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs.current(), 'example.com:1234')
# change the file and get the new current replica
f = open(self.fn, 'w')
f.write('localhost\nalternate\n')
f.close()
with open(self.fn, 'w') as f:
f.write('localhost\nalternate\n')
rs._config_checked = 0
rs._config_modified = 0
self.assertEqual(rs.current(), 'localhost')
Expand Down Expand Up @@ -106,9 +105,8 @@ def test_next_iteration(self):

def test_next_only_one_server(self):
from relstorage.adapters.replica import ReplicaSelector
f = open(self.fn, 'w')
f.write('localhost\n')
f.close()
with open(self.fn, 'w') as f:
f.write('localhost\n')
rs = ReplicaSelector(self.fn, 600.0)
self.assertEqual(rs.current(), 'localhost')
self.assertEqual(rs.next(), None)
Expand All @@ -119,9 +117,8 @@ def test_next_with_new_conf(self):
self.assertEqual(rs.current(), 'example.com:1234')
self.assertEqual(rs.next(), 'localhost:4321')
# interrupt the iteration by changing the replica conf file
f = open(self.fn, 'w')
f.write('example.com:9999\n')
f.close()
with open(self.fn, 'w') as f:
f.write('example.com:9999\n')
rs._config_checked = 0
rs._config_modified = 0
self.assertEqual(rs.next(), 'example.com:9999')
Expand Down

0 comments on commit 620efd5

Please sign in to comment.