Skip to content

Commit

Permalink
Merge eb39c61 into 0116ebb
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 27, 2020
2 parents 0116ebb + eb39c61 commit 61b1bfb
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 51 deletions.
6 changes: 5 additions & 1 deletion .pylintrc
Expand Up @@ -33,6 +33,8 @@
# chained-comparison: It wants you to rewrite `x > 2 and x < 3` using something like `2 < x < 3`,
# which I don't like, I find that less readable.
# import-outside-toplevel: New in pylint 2.4. We have a large number of deferred imports.
# super-with-arguments and raise-missing-from: New in pylint 2.7; we can't use them because
# we support Python 2.
disable=
invalid-name,
missing-docstring,
Expand All @@ -54,7 +56,9 @@ disable=
chained-comparison,
too-many-ancestors,
import-outside-toplevel,
relative-beyond-top-level
relative-beyond-top-level,
super-with-arguments,
raise-missing-from
# undefined-all-variable


Expand Down
14 changes: 12 additions & 2 deletions CHANGES.rst
Expand Up @@ -5,8 +5,18 @@
3.2.1 (unreleased)
==================

- Nothing changed yet.

- Improve the speed of loading large cache files by reducing the cost
of cache validation.

- The timing metrics for ``current_object_oids`` are always collected,
not just sampled. MySQL and PostgreSQL will only call this method
once at startup during persistent cache validation. Other databases
may call this method once during the commit process.

- Add the ability to limit how long persistent cache validation will
spend polling the database for invalid OIDs. Set the environment
variable ``RS_CACHE_POLL_TIMEOUT`` to a number of seconds before
importing RelStorage to use this.

3.2.0 (2020-07-20)
==================
Expand Down
25 changes: 18 additions & 7 deletions src/relstorage/_compat.py
Expand Up @@ -70,6 +70,8 @@
'base64_decodebytes',
'update_wrapper',

# Clocks
'perf_counter',
]

PY3 = sys.version_info[0] == 3
Expand All @@ -79,6 +81,19 @@
WIN = sys.platform.startswith('win')
MAC = sys.platform.startswith('darwin')

try:
# Python 3.3+ (PEP 418)
from time import perf_counter
except ImportError:
import time

if sys.platform == "win32":
perf_counter = time.clock # pylint: disable=no-member
else:
perf_counter = time.time

del time

# Dict support

if PY3:
Expand Down Expand Up @@ -292,13 +307,13 @@ def iteroiditems(d):
return d.iteritems() if hasattr(d, 'iteritems') else d.items()

# Types
from perfmetrics import metricmethod # pylint:disable=wrong-import-position
from perfmetrics import Metric # pylint:disable=wrong-import-position

if PY3:
string_types = (str,)
number_types = (int, float)
from io import StringIO as NStringIO
from perfmetrics import metricmethod
from perfmetrics import Metric
from functools import wraps
else:
string_types = (basestring,) # pylint:disable=undefined-variable
Expand All @@ -317,10 +332,6 @@ def __call__(self, replacement):
replacement.__wrapped__ = self._orig
return replacement

from perfmetrics import Metric

metricmethod = Metric(method=True)

metricmethod_sampled = Metric(method=True, rate=0.1)

IN_TESTRUNNER = (
Expand All @@ -331,7 +342,7 @@ def __call__(self, replacement):
)


if IN_TESTRUNNER:
if IN_TESTRUNNER and os.environ.get('RS_TEST_DISABLE_METRICS'):
# If we're running under the testrunner,
# don't apply the metricmethod stuff. It makes
# backtraces ugly and makes stepping in the
Expand Down
23 changes: 14 additions & 9 deletions src/relstorage/_util.py
Expand Up @@ -45,6 +45,7 @@

from relstorage._compat import wraps
from relstorage._compat import update_wrapper
from relstorage._compat import perf_counter

logger = logging.getLogger(__name__)
perf_logger = logger.getChild('timing')
Expand All @@ -71,6 +72,7 @@
'parse_boolean',
'parse_byte_size',
'positive_integer',
'get_time_from_environ',
]

positive_integer = RangeCheckedConversion(integer, min=1)
Expand Down Expand Up @@ -108,18 +110,12 @@ def timestamp_at_unixtime(now):
seconds = now % 60.0
return TimeStamp(*(gmtime[:5] + (seconds,)))

try:
from pyperf import perf_counter as _counter
except ImportError: # pragma: no cover
_counter = time.time


class timer(object):
__begin = None
__end = None
duration = None

counter = _counter
counter = perf_counter

def __enter__(self):
self.__begin = self.counter()
Expand All @@ -129,10 +125,19 @@ def __exit__(self, t, v, tb):
self.__end = self.counter()
self.duration = self.__end - self.__begin

def get_time_from_environ(environ_name, default):
env_val = os.environ.get(environ_name, default)
try:
result = float(env_val)
except (ValueError, TypeError):
result = default
logger.debug('Using %s from environ %r=%r', result, environ_name, env_val)
return result


def _get_log_time_level(level_int, default):
level_name = logging.getLevelName(level_int)
val = os.environ.get('RS_PERF_LOG_%s_MIN' % level_name, default)
val = get_time_from_environ('RS_PERF_LOG_%s_MIN' % level_name, default)
return (level_int, float(val))

# A list of tuples (level_int, min_duration), ordered by increasing
Expand Down Expand Up @@ -221,7 +226,7 @@ def log_timed(func):
func.__wrapped__ = None
return func

counter = _counter
counter = perf_counter
log = do_log_duration_info
func_logger = logging.getLogger(func.__module__).getChild('timing')

Expand Down
33 changes: 30 additions & 3 deletions src/relstorage/adapters/batch.py
Expand Up @@ -17,8 +17,11 @@
import itertools

from relstorage._compat import iteritems
from relstorage._compat import perf_counter
from relstorage._util import parse_byte_size

from .interfaces import AggregateOperationTimeoutError

# A cache
# {(placeholder, count): "string"}
_row_schemas = {}
Expand All @@ -34,6 +37,8 @@ class RowBatcher(object):
can be set in the ``delete_placeholder`` attribute.
"""

perf_counter = perf_counter

# How many total rows can be sent at once. Also used as
# ``bind_limit`` if that is 0 or None.
row_limit = 1024
Expand Down Expand Up @@ -143,21 +148,34 @@ def row_schema_of_length(self, param_count):
_row_schemas[key] = p
return p

def select_from(self, columns, table, suffix='', **kw):
def select_from(self, columns, table, suffix='', timeout=None, **kw):
"""
Handles a query of the ``WHERE col IN (?, ?,)`` type::
``SELECT columns FROM table WHERE col IN (?, ...)``
The keyword arguments should be of length 1, containing an
iterable of the values to check: ``col=(1, 2)`` or in the
dynamic case ``**{indirect_var: [1, 2]}``.
dynamic case ``**{indirect_var: [1, 2]}``::
batcher.select_from(('zoid', 'tid',), 'object_state',
zoid=oids)
The number of batches needed is determined by the length of
the iterator divided by this object's ``bind_limit``,
or, if that's not set, by the ``row_limit``.
Returns a iterator of matching rows.
Returns a iterator of matching rows. Matching rows are delivered
incrementally, so some number of rows may be delivered and then
an exception is raised.
:keyword float timeout: If given, provides a number of seconds
that is the approximate maximum amount of time this method will
be allowed to take.
:raises AggregateOperationTimeoutError: If *timeout* is given,
and the cumulative time taken to query and process
some subset of batches exceeds *timeout*. This is checked
after each individual batch.
"""
assert len(kw) == 1
filter_column, filter_values = kw.popitem()
Expand All @@ -168,15 +186,24 @@ def select_from(self, columns, table, suffix='', **kw):
chunk_size = self.bind_limit or self.row_limit
chunk_size -= 1

begin = self.perf_counter() if timeout else None

for head in filter_values:
filter_subset = list(itertools.islice(filter_values, chunk_size))
filter_subset.append(head)

descriptor = [[(table, (filter_column,)), filter_subset]]

self._do_batch(command, descriptor, rows_need_flattened=False, suffix=suffix)

for row in self.cursor.fetchall():
yield row

if timeout and self.perf_counter() - begin >= timeout:
# TODO: It'd be nice not to do this if we had no more
# batches to do.
raise AggregateOperationTimeoutError

def flush(self):
if self.deletes:
self.total_rows_deleted += self._do_deletes()
Expand Down
22 changes: 20 additions & 2 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -767,11 +767,18 @@ def get_object_tid_after(cursor, oid, tid):
Returns None if no later state exists.
"""

def current_object_tids(cursor, oids):
def current_object_tids(cursor, oids, timeout=None):
"""
Returns the current {oid_int: tid_int} for specified object ids.
Returns the current ``{oid_int: tid_int}`` for specified object ids.
Note that this may be a BTree mapping, not a dictionary.
:param oids: An iterable of OID integers.
:keyword float timeout: If not None, this is an approximate upper bound
(in seconds) on how long this function will run.
:raises AggregateOperationTimeoutError: If the timeout was exceeded.
This will have one extra attribute set, ``partial_result``, which will be a
(partial) mapping of the results collected before the timeout.
"""

def on_store_opened(cursor, restart=False):
Expand Down Expand Up @@ -1647,3 +1654,14 @@ def from_prev_and_new_tid(cls, prev_polled_tid, new_polled_tid):
return cls(
"The database connection is stale: new_polled_tid=%d, "
"prev_polled_tid=%d." % (new_polled_tid, prev_polled_tid))


class AggregateOperationTimeoutError(Exception):
"""
Raised when an aggregate operation in RelStorage detects
that it has exceeded a specified timeout.
"""

#: If the function knows a useful, partial result, it may set this
#: attribute. Check it against the class value to see if it's been set.
partial_result = object()
27 changes: 22 additions & 5 deletions src/relstorage/adapters/mover.py
Expand Up @@ -23,12 +23,14 @@

from .._compat import OID_TID_MAP_TYPE
from .._compat import metricmethod_sampled
from .._compat import metricmethod
from ._util import noop_when_history_free
from ._util import query_property as _query_property
from ._util import DatabaseHelpersMixin
from .._compat import ABC
from .batch import RowBatcher
from .interfaces import IObjectMover
from .interfaces import AggregateOperationTimeoutError
from .schema import Schema
from .sql import it
from .sql.schema import ColumnExpression
Expand Down Expand Up @@ -207,15 +209,30 @@ def get_object_tid_after(self, cursor, oid, tid):

_current_object_tids_map_type = OID_TID_MAP_TYPE

@metricmethod_sampled
def current_object_tids(self, cursor, oids):
@metricmethod
def current_object_tids(self, cursor, oids, timeout=None):
"""Returns the current {oid: tid} for specified object ids."""
# This is a metricmethod, not a metricmethod_sampled because only databases that
# use composed lock_objects_and_detect_conflicts call it for every transaction,
# and even then they only call it once, so its like the tpc_* methods.
# Other databases use it only when restoring the cache at startup (just once) so its
# unlikely to get sampled (see relstorage.cache.mvcc).
res = self._current_object_tids_map_type()
columns, table, filter_column = self._current_object_tids_query
batcher = self.make_batcher(cursor)
rows = batcher.select_from(columns, table, **{filter_column: oids})
res = self._current_object_tids_map_type(list(rows))

rows = batcher.select_from(columns, table, timeout=timeout, **{filter_column: oids})
if timeout:
# Do the collecting and iterating in Python so we can handle partial results
res = self._current_object_tids_map_type()
try:
for (oid, tid) in rows:
res[oid] = tid
except AggregateOperationTimeoutError as ex:
ex.partial_result = res
raise
else:
# Do the collecting and iterating in C
res = self._current_object_tids_map_type(list(rows))
return res


Expand Down
38 changes: 38 additions & 0 deletions src/relstorage/adapters/tests/test_batch.py
Expand Up @@ -321,6 +321,44 @@ def test_select_multiple_many_batch_bind_limit(self):
self.test_select_multiple_many_batch(batch_limit_attr='bind_limit')


def test_select_from_timeout(self):
from relstorage.tests import mock
from relstorage.adapters.interfaces import AggregateOperationTimeoutError
cursor = MockCursor()
cursor.sort_sequence_params = True
cursor.many_results = [
[(1, 1)],
[(2, 1)],
[(3, 1)],
[]
]
batcher = self.getClass()(cursor)
batcher.bind_limit = 1
batcher.perf_counter = mock.Mock()
# These will be the time values returned from perf_counter()
batcher.perf_counter.side_effect = (
12345, # Begin
12346, # First batch
12347, # Second batch
)

gener = batcher.select_from(('zoid', 'tid',), 'object_state',
timeout=2,
oids=[1, 2, 3, 4, 5])
rows = []
with self.assertRaises(AggregateOperationTimeoutError):
for row in gener:
rows.append(row)

# We ran exactly twice before the perf_counter exceeded the timeout.
self.assertEqual(rows, [
(1, 1),
(2, 1),
])




class OracleRowBatcherTests(TestCase):

def getClass(self):
Expand Down

0 comments on commit 61b1bfb

Please sign in to comment.