Skip to content

Commit

Permalink
Merge pull request #66 from zopefoundation/MVCCAdapter
Browse files Browse the repository at this point in the history
Abstract ZODB's MVCC implementation into a storage adapter.
  • Loading branch information
jimfulton committed Jun 15, 2016
2 parents 9bcd944 + 61d2c75 commit b64344f
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 390 deletions.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -20,7 +20,7 @@
interface, rich transaction support, and undo.
"""

version = "4.3.0.dev0"
version = "5.0.dev0"

import os
from setuptools import setup, find_packages
Expand Down
239 changes: 50 additions & 189 deletions src/ZODB/Connection.py

Large diffs are not rendered by default.

81 changes: 27 additions & 54 deletions src/ZODB/DB.py
Expand Up @@ -401,12 +401,6 @@ def __init__(self, storage,
- `xrefs` - Boolian flag indicating whether implicit cross-database
references are allowed
"""
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)

# Allocate lock.
x = utils.RLock()
Expand All @@ -423,12 +417,24 @@ def __init__(self, storage,
self._historical_cache_size_bytes = historical_cache_size_bytes

# Setup storage
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
else:
assert not storage_args

self.storage = storage

if IMVCCStorage.providedBy(storage):
self._mvcc_storage = storage
else:
from .mvccadapter import MVCCAdapter
self._mvcc_storage = MVCCAdapter(storage)

self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat

if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn(
Expand All @@ -438,12 +444,10 @@ def __init__(self, storage,
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None

if IMVCCStorage.providedBy(storage):
temp_storage = storage.new_instance()
else:
temp_storage = storage
temp_storage = self._mvcc_storage.new_instance()
try:
try:
temp_storage.poll_invalidations()
temp_storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
Expand All @@ -462,8 +466,7 @@ def __init__(self, storage,
temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t)
finally:
if IMVCCStorage.providedBy(temp_storage):
temp_storage.release()
temp_storage.release()

# Multi-database setup.
if databases is None:
Expand Down Expand Up @@ -634,16 +637,13 @@ def close(self):

@self._connectionMap
def _(c):
if c.opened:
c.transaction_manager.abort()
# Note that this will modify our pool, but this is safe, because
# _connectionMap makes a list of the pool to iterate over
c.close()
c.transaction_manager.abort()
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()

self.storage.close()
self._mvcc_storage.close()
del self.storage
del self._mvcc_storage

def getCacheSize(self):
return self._cache_size
Expand Down Expand Up @@ -675,27 +675,6 @@ def getHistoricalPoolSize(self):
def getHistoricalTimeout(self):
return self.historical_pool.timeout

def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)

def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._connectionMap(lambda c: c.invalidateCache())

transform_record_data = untransform_record_data = lambda self, data: data

def objectCount(self):
Expand Down Expand Up @@ -1001,9 +980,9 @@ class TransactionalUndo(object):

def __init__(self, db, tids):
self._db = db
self._storage = db.storage
self._storage = getattr(
db._mvcc_storage, 'undo_instance', db._mvcc_storage.new_instance)()
self._tids = tids
self._oids = set()

def abort(self, transaction):
pass
Expand All @@ -1013,19 +992,13 @@ def tpc_begin(self, transaction):

def commit(self, transaction):
for tid in self._tids:
result = self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
self._storage.undo(tid, transaction)

def tpc_vote(self, transaction):
for oid, _ in self._storage.tpc_vote(transaction) or ():
self._oids.add(oid)
self._storage.tpc_vote(transaction)

def tpc_finish(self, transaction):
self._storage.tpc_finish(
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
self._storage.tpc_finish(transaction)

def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
Expand Down
2 changes: 0 additions & 2 deletions src/ZODB/historical_connections.txt
Expand Up @@ -273,8 +273,6 @@ test.
>>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close()

Note that if you try to open an historical connection to a time in the future,
Expand Down
49 changes: 3 additions & 46 deletions src/ZODB/interfaces.py
Expand Up @@ -201,21 +201,6 @@ def db():
def isReadOnly():
"""Returns True if the storage for this connection is read only."""

def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is an iterable of oids.
"""

def root():
"""Return the database root object.
Expand Down Expand Up @@ -278,14 +263,6 @@ def getTransferCounts(clear=False):
If clear is True, reset the counters.
"""

def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""

def readCurrent(obj):
"""Make sure an object being read is current
Expand Down Expand Up @@ -625,19 +602,6 @@ def loadSerial(oid, serial):
otherwise, POSKeyError is raised.
"""

# The following two methods are effectively part of the interface,
# as they are generally needed when one storage wraps
# another. This deserves some thought, at probably debate, before
# adding them.
#
# def _lock_acquire():
# """Acquire the storage lock
# """

# def _lock_release():
# """Release the storage lock
# """

def new_oid():
"""Allocate a new object id.
Expand Down Expand Up @@ -675,11 +639,7 @@ def registerDB(wrapper):
The passed object is a wrapper object that provides an upcall
interface to support composition.
Note that, for historical reasons, an implementation may
require a second argument, however, if required, the None will
be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
Note that, for historical reasons, this is called registerDB rather
than register_wrapper.
"""

Expand Down Expand Up @@ -818,7 +778,6 @@ def tpc_vote(transaction):
"""


class IStorageRestoreable(IStorage):
"""Copying Transactions
Expand Down Expand Up @@ -1110,11 +1069,9 @@ def new_instance():
"""

def release():
"""Release all persistent sessions used by this storage instance.
"""Release resources held by the storage instance.
After this call, the storage instance can still be used;
calling methods that use persistent sessions will cause the
persistent sessions to be reopened.
The storage instance won't be used again after this call.
"""

def poll_invalidations():
Expand Down

0 comments on commit b64344f

Please sign in to comment.