Skip to content

Commit

Permalink
Abstract ZODB's MVCC implementation into a storage adapter.
Browse files Browse the repository at this point in the history
That's applied to storages other than RelStorage.

See: https://groups.google.com/forum/#!topic/zodb/QJYmvF0eUUM
  • Loading branch information
Jim Fulton committed Jun 14, 2016
1 parent 9bcd944 commit 2909137
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 517 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
interface, rich transaction support, and undo.
"""

version = "4.3.0.dev0"
version = "5.0.dev0"

import os
from setuptools import setup, find_packages
Expand Down
229 changes: 41 additions & 188 deletions src/ZODB/Connection.py

Large diffs are not rendered by default.

81 changes: 27 additions & 54 deletions src/ZODB/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,6 @@ def __init__(self, storage,
- `xrefs` - Boolian flag indicating whether implicit cross-database
references are allowed
"""
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)

# Allocate lock.
x = utils.RLock()
Expand All @@ -423,12 +417,24 @@ def __init__(self, storage,
self._historical_cache_size_bytes = historical_cache_size_bytes

# Setup storage
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
else:
assert not storage_args

self.storage = storage

if IMVCCStorage.providedBy(storage):
self._mvcc_storage = storage
else:
from .mvccadapter import MVCCAdapter
self._mvcc_storage = MVCCAdapter(storage)

self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat

if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn(
Expand All @@ -438,12 +444,10 @@ def __init__(self, storage,
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None

if IMVCCStorage.providedBy(storage):
temp_storage = storage.new_instance()
else:
temp_storage = storage
temp_storage = self._mvcc_storage.new_instance()
try:
try:
temp_storage.poll_invalidations()
temp_storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
Expand All @@ -462,8 +466,7 @@ def __init__(self, storage,
temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t)
finally:
if IMVCCStorage.providedBy(temp_storage):
temp_storage.release()
temp_storage.release()

# Multi-database setup.
if databases is None:
Expand Down Expand Up @@ -634,16 +637,13 @@ def close(self):

@self._connectionMap
def _(c):
if c.opened:
c.transaction_manager.abort()
# Note that this will modify our pool, but this is safe, because
# _connectionMap makes a list of the pool to iterate over
c.close()
c.transaction_manager.abort()
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()

self.storage.close()
self._mvcc_storage.close()
del self.storage
del self._mvcc_storage

def getCacheSize(self):
return self._cache_size
Expand Down Expand Up @@ -675,27 +675,6 @@ def getHistoricalPoolSize(self):
def getHistoricalTimeout(self):
return self.historical_pool.timeout

def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)

def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._connectionMap(lambda c: c.invalidateCache())

transform_record_data = untransform_record_data = lambda self, data: data

def objectCount(self):
Expand Down Expand Up @@ -1001,9 +980,9 @@ class TransactionalUndo(object):

def __init__(self, db, tids):
self._db = db
self._storage = db.storage
self._storage = getattr(
db._mvcc_storage, 'undo_instance', db._mvcc_storage.new_instance)()
self._tids = tids
self._oids = set()

def abort(self, transaction):
pass
Expand All @@ -1013,19 +992,13 @@ def tpc_begin(self, transaction):

def commit(self, transaction):
for tid in self._tids:
result = self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
self._storage.undo(tid, transaction)

def tpc_vote(self, transaction):
for oid, _ in self._storage.tpc_vote(transaction) or ():
self._oids.add(oid)
self._storage.tpc_vote(transaction)

def tpc_finish(self, transaction):
self._storage.tpc_finish(
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
self._storage.tpc_finish(transaction)

def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
Expand Down
2 changes: 0 additions & 2 deletions src/ZODB/historical_connections.txt
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ test.
>>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close()

Note that if you try to open an historical connection to a time in the future,
Expand Down
Loading

0 comments on commit 2909137

Please sign in to comment.