Skip to content

Commit

Permalink
Merge d524bb3 into 5ed42de
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 22, 2019
2 parents 5ed42de + d524bb3 commit c04e718
Show file tree
Hide file tree
Showing 53 changed files with 3,039 additions and 603 deletions.
4 changes: 4 additions & 0 deletions src/relstorage/_compat.py
Expand Up @@ -55,6 +55,8 @@ def list_values(d):
OID_OBJECT_MAP_TYPE = dict
OID_SET_TYPE = set

MAX_TID = BTrees.family64.maxint

def iteroiditems(d):
# Could be either a BTree, which always has 'iteritems',
# or a plain dict, which may or may not have iteritems.
Expand All @@ -65,9 +67,11 @@ def iteroiditems(d):
if PY3:
string_types = (str,)
unicode = str
from io import StringIO as NStringIO
else:
string_types = (basestring,)
unicode = unicode
from io import BytesIO as NStringIO

try:
from abc import ABC
Expand Down
39 changes: 39 additions & 0 deletions src/relstorage/_util.py
Expand Up @@ -143,6 +143,45 @@ def __get__(self, inst, class_):
inst.__dict__[name] = value
return value

class CachedIn(object):
"""Cached method with given cache attribute."""

def __init__(self, attribute_name, factory=dict):
self.attribute_name = attribute_name
self.factory = factory

def __call__(self, func):

@functools.wraps(func)
def decorated(instance):
cache = self.cache(instance)
key = () # We don't support arguments right now, so only one key.
try:
v = cache[key]
except KeyError:
v = cache[key] = func(instance)
return v

decorated.invalidate = self.invalidate
return decorated

def invalidate(self, instance):
cache = self.cache(instance)
key = ()
try:
del cache[key]
except KeyError:
pass

def cache(self, instance):
try:
cache = getattr(instance, self.attribute_name)
except AttributeError:
cache = self.factory()
setattr(instance, self.attribute_name, cache)
return cache


def to_utf8(data):
if data is None or isinstance(data, bytes):
return data
Expand Down
167 changes: 100 additions & 67 deletions src/relstorage/adapters/dbiter.py
Expand Up @@ -16,29 +16,38 @@

from zope.interface import implementer

from .interfaces import IDatabaseIterator
from relstorage._compat import MAX_TID

from .interfaces import IDatabaseIterator
from .schema import Schema
from .sql import it

class DatabaseIterator(object):
"""Abstract base class for database iteration.
"""
Abstract base class for database iteration.
"""

def __init__(self, database_driver, runner):
self.runner = runner
def __init__(self, database_driver):
"""
:param database_driver: Necessary to bind queries correctly.
"""
self.driver = database_driver

_iter_objects_query = Schema.object_state.select(
it.c.zoid,
it.c.state
).where(
it.c.tid == it.bindparam('tid')
).order_by(
it.c.zoid
)

def iter_objects(self, cursor, tid):
"""Iterate over object states in a transaction.
Yields (oid, prev_tid, state) for each object state.
Yields ``(oid, state)`` for each object in the transaction.
"""
stmt = """
SELECT zoid, state
FROM object_state
WHERE tid = %(tid)s
ORDER BY zoid
"""
self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
self._iter_objects_query.execute(cursor, {'tid': tid})
for oid, state in cursor:
state = self.driver.binary_column_as_state_type(state)
yield oid, state
Expand All @@ -47,6 +56,8 @@ def iter_objects(self, cursor, tid):
@implementer(IDatabaseIterator)
class HistoryPreservingDatabaseIterator(DatabaseIterator):

keep_history = True

def _transaction_iterator(self, cursor):
"""
Iterate over a list of transactions returned from the database.
Expand All @@ -71,22 +82,38 @@ def _transaction_iterator(self, cursor):

yield (tid, username, description, ext) + tuple(row[4:])

_iter_transactions_query = Schema.transaction.select(
it.c.tid, it.c.username, it.c.description, it.c.extension
).where(
it.c.packed == False # pylint:disable=singleton-comparison
).and_(
it.c.tid != 0
).order_by(
it.c.tid, 'DESC'
)

def iter_transactions(self, cursor):
"""Iterate over the transaction log, newest first.
Skips packed transactions.
Yields (tid, username, description, extension) for each transaction.
"""
stmt = """
SELECT tid, username, description, extension
FROM transaction
WHERE packed = %(FALSE)s
AND tid != 0
ORDER BY tid DESC
"""
self.runner.run_script_stmt(cursor, stmt)
self._iter_transactions_query.execute(cursor)
return self._transaction_iterator(cursor)

_iter_transactions_range_query = Schema.transaction.select(
it.c.tid,
it.c.username,
it.c.description,
it.c.extension,
it.c.packed,
).where(
it.c.tid >= it.bindparam('min_tid')
).and_(
it.c.tid <= it.bindparam('max_tid')
).order_by(
it.c.tid
)

def iter_transactions_range(self, cursor, start=None, stop=None):
"""Iterate over the transactions in the given range, oldest first.
Expand All @@ -95,21 +122,31 @@ def iter_transactions_range(self, cursor, start=None, stop=None):
Yields (tid, username, description, extension, packed)
for each transaction.
"""
stmt = """
SELECT tid, username, description, extension,
CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
FROM transaction
WHERE tid >= 0
"""
if start is not None:
stmt += " AND tid >= %(min_tid)s"
if stop is not None:
stmt += " AND tid <= %(max_tid)s"
stmt += " ORDER BY tid"
self.runner.run_script_stmt(cursor, stmt,
{'min_tid': start, 'max_tid': stop})
params = {
'min_tid': start if start else 0,
'max_tid': stop if stop else MAX_TID
}
self._iter_transactions_range_query.execute(cursor, params)
return self._transaction_iterator(cursor)

_object_exists_query = Schema.current_object.select(
1
).where(
it.c.zoid == it.bindparam('oid')
)

_object_history_query = Schema.transaction.natural_join(
Schema.object_state
).select(
it.c.tid, it.c.username, it.c.description, it.c.extension,
Schema.object_state.c.state_size
).where(
it.c.zoid == it.bindparam("oid")
).and_(
it.c.packed == False # pylint:disable=singleton-comparison
).order_by(
it.c.tid, "DESC"
)

def iter_object_history(self, cursor, oid):
"""Iterate over an object's history.
Expand All @@ -118,59 +155,60 @@ def iter_object_history(self, cursor, oid):
Yields (tid, username, description, extension, pickle_size)
for each modification.
"""
stmt = """
SELECT 1 FROM current_object WHERE zoid = %(oid)s
"""
self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
params = {'oid': oid}
self._object_exists_query.execute(cursor, params)
if not cursor.fetchall():
raise KeyError(oid)

stmt = """
SELECT tid, username, description, extension, state_size
FROM transaction
JOIN object_state USING (tid)
WHERE zoid = %(oid)s
AND packed = %(FALSE)s
ORDER BY tid DESC
"""
self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
self._object_history_query.execute(cursor, params)
return self._transaction_iterator(cursor)


@implementer(IDatabaseIterator)
class HistoryFreeDatabaseIterator(DatabaseIterator):

keep_history = False

def iter_transactions(self, cursor):
"""Iterate over the transaction log, newest first.
Skips packed transactions.
Yields (tid, username, description, extension) for each transaction.
Yields ``(tid, username, description, extension)`` for each transaction.
This always returns an empty iterable.
"""
# pylint:disable=unused-argument
return []

_iter_transactions_range_query = Schema.object_state.select(
it.c.tid,
).where(
it.c.tid >= it.bindparam('min_tid')
).and_(
it.c.tid <= it.bindparam('max_tid')
).order_by(
it.c.tid
).distinct()

def iter_transactions_range(self, cursor, start=None, stop=None):
"""Iterate over the transactions in the given range, oldest first.
Includes packed transactions.
Yields (tid, username, description, extension, packed)
Yields ``(tid, username, description, extension, packed)``
for each transaction.
"""
stmt = """
SELECT DISTINCT tid
FROM object_state
WHERE tid > 0
"""
if start is not None:
stmt += " AND tid >= %(min_tid)s"
if stop is not None:
stmt += " AND tid <= %(max_tid)s"
stmt += " ORDER BY tid"
self.runner.run_script_stmt(cursor, stmt,
{'min_tid': start, 'max_tid': stop})
return ((tid, '', '', '', True) for (tid,) in cursor)
params = {
'min_tid': start if start else 0,
'max_tid': stop if stop else MAX_TID
}
self._iter_transactions_range_query.execute(cursor, params)
return ((tid, b'', b'', b'', True) for (tid,) in cursor)

_iter_object_history_query = Schema.object_state.select(
it.c.tid, it.c.state_size
).where(
it.c.zoid == it.bindparam('oid')
)

def iter_object_history(self, cursor, oid):
"""
Expand All @@ -180,12 +218,7 @@ def iter_object_history(self, cursor, oid):
Yields a single row,
``(tid, username, description, extension, pickle_size)``
"""
stmt = """
SELECT tid, state_size
FROM object_state
WHERE zoid = %(oid)s
"""
self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
self._iter_object_history_query.execute(cursor, {'oid': oid})
rows = cursor.fetchall()
if not rows:
raise KeyError(oid)
Expand Down
10 changes: 10 additions & 0 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -50,6 +50,14 @@ def __str__():
"""Return a short description of the adapter"""


class IDBDialect(Interface):
"""
Handles converting from our internal "standard" SQL queries to
something database specific.
"""

# TODO: Fill this in.

class IDBDriver(Interface):
"""
An abstraction over the information needed for RelStorage to work
Expand All @@ -71,6 +79,8 @@ class IDBDriver(Interface):

Binary = Attribute("A callable.")

dialect = Attribute("The IDBDialect for this driver.")

def binary_column_as_state_type(db_column_data):
"""
Turn *db_column_data* into something that's a valid pickle
Expand Down

0 comments on commit c04e718

Please sign in to comment.