Skip to content

Commit

Permalink
Merge 14c65a1 into 2df6b35
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 12, 2016
2 parents 2df6b35 + 14c65a1 commit cb4bf47
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 130 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ notifications:
install:
- pip install -U pip setuptools
- pip install -U tox coveralls
# Install patched version of 4; waiting on 4.4.3.
# Same change was merged to 5.0 branch post a5
- pip install git+https://github.com/zopefoundation/ZODB.git@4#egg=ZODB
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then pip install -U python-memcached; fi
- if [[ $TRAVIS_PYTHON_VERSION != 'pypy' ]]; then pip install -U pylibmc cffi; fi
- pip install -U -e ".[test]"
Expand Down
1 change: 0 additions & 1 deletion relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def _check_tid_after_load(self, oid_int, actual_tid_int,
# transaction order, leading it to sometimes put the
# wrong tid in delta_after*.
cp0, cp1 = self.checkpoints
import os

msg = ("Detected an inconsistency "
"between the RelStorage cache and the database "
Expand Down
42 changes: 27 additions & 15 deletions relstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def release(self):

@implementer(ZODB.interfaces.IStorage,
ZODB.interfaces.IMVCCStorage,
ZODB.interfaces.IMultiCommitStorage,
ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
Expand Down Expand Up @@ -177,6 +178,12 @@ class RelStorage(UndoLogCompatible,
# when the database connection is stale (due to async replication).
_stale_error = None

# OIDs resolved by undo()
_resolved = ()

# user, description, extension from transaction metadata.
_ude = None

def __init__(self, adapter, name=None, create=None,
options=None, cache=None, blobhelper=None,
# The top-level storage should use locks because
Expand Down Expand Up @@ -781,6 +788,7 @@ def tpc_begin(self, transaction, tid=None, status=' '):
self._lock.acquire()
self._clear_temp()
self._transaction = transaction
self._resolved = set()

user = _to_latin1(transaction.user)
desc = _to_latin1(transaction.description)
Expand Down Expand Up @@ -868,7 +876,7 @@ def _clear_temp(self):
def _finish_store(self):
"""Move stored objects from the temporary table to final storage.
Returns a list of (oid, tid) to be received by
Returns a sequence of OIDs that were resolved to be received by
Connection._handle_serial().
"""
assert self._tid is not None
Expand Down Expand Up @@ -909,21 +917,15 @@ def _finish_store(self):

# Move the new states into the permanent table
tid_int = u64(self._tid)
serials = []

if self.blobhelper is not None:
txn_has_blobs = self.blobhelper.txn_has_blobs
else:
txn_has_blobs = False
oid_ints = adapter.mover.move_from_temp(cursor, tid_int, txn_has_blobs)
for oid_int in oid_ints:
oid = p64(oid_int)
if oid in resolved:
serial = ConflictResolution.ResolvedSerial
else:
serial = self._tid
serials.append((oid, serial))

return serials

return resolved

@metricmethod
def tpc_vote(self, transaction):
Expand All @@ -933,7 +935,12 @@ def tpc_vote(self, transaction):
"tpc_vote called with wrong transaction")

try:
return self._vote()
resolved_by_vote = self._vote()
if self._resolved:
# self._resolved contains OIDs from undo()
self._resolved.update(resolved_by_vote)
return self._resolved
return resolved_by_vote
except:
if abort_early:
# abort early to avoid lockups while running the
Expand Down Expand Up @@ -978,15 +985,16 @@ def _vote(self):
raise ReadConflictError(
oid=oid, serials=(actual, expect))

serials = self._finish_store()
resolved_serials = self._finish_store()
self._adapter.mover.update_current(cursor, tid_int)
self._prepared_txn = self._adapter.txncontrol.commit_phase1(
conn, cursor, tid_int)

if self.blobhelper is not None:
self.blobhelper.vote(self._tid)

return serials
# New storage protocol
return resolved_serials

@metricmethod
def tpc_finish(self, transaction, f=None):
Expand All @@ -1000,6 +1008,7 @@ def tpc_finish(self, transaction, f=None):
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
return self._tid
finally:
self._clear_temp()
finally:
Expand Down Expand Up @@ -1216,7 +1225,10 @@ def undo(self, transaction_id, transaction):
if self.blobhelper is not None:
self.blobhelper.copy_undone(copied, self._tid)

return self._tid, oids
#return self._tid, oids
# new storage protocol
self._resolved.update(oids)
return None
finally:
adapter.locker.release_pack_lock(cursor)

Expand Down Expand Up @@ -1423,7 +1435,7 @@ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
(or copy and remove it) immediately, or at transaction-commit
time. The file must not be open.
The new serial is returned.
Returns nothing.
"""
assert not version
with self._lock:
Expand Down
6 changes: 2 additions & 4 deletions relstorage/tests/RecoveryStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from transaction import Transaction
from ZODB import DB
from ZODB.serialize import referencesf
from ZODB.tests.StorageTestBase import handle_serials
from ZODB.tests.StorageTestBase import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.tests.StorageTestBase import zodb_pickle
Expand Down Expand Up @@ -215,10 +214,9 @@ def checkRestoreAfterDoubleCommit(self):
# Store an object
self._storage.store(oid, revid, data1, '', t)
# Store it again
r1 = self._storage.store(oid, revid, data2, '', t)
self._storage.store(oid, revid, data2, '', t)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
Expand Down
44 changes: 14 additions & 30 deletions relstorage/tests/blob/blob_packing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,25 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> import os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> with blob.open('w') as f: _ = f.write(b'this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 4')
>>> transaction.commit()
>>> for i in range(5):
... _ = transaction.begin()
... times.append(new_time())
... with blob.open('w') as file:
... _ = file.write(b'this is blob data ' + str(i).encode())
... if i:
... tids.append(blob._p_serial)
... else:
... root['blob'] = blob
... transaction.commit()

>>> blob._p_activate()
>>> tids.append(blob._p_serial)

>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
>>> [ os.path.exists(x) for x in fns ] # no pack
[True, True, True, True, True]

Do a pack to the slightly before the first revision was written:
Expand Down Expand Up @@ -125,4 +109,4 @@ revision as well as the entire directory:

Clean up our blob directory and database:

>>> blob_storage.close()
>>> database.close()
42 changes: 13 additions & 29 deletions relstorage/tests/blob/blob_packing_history_free.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,25 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> import os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> with blob.open('w') as f: _ = f.write(b'this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 4')
>>> transaction.commit()
>>> for i in range(5):
... _ = transaction.begin()
... times.append(new_time())
... with blob.open('w') as file:
... _ = file.write(b'this is blob data ' + str(i).encode())
... if i:
... tids.append(blob._p_serial)
... else:
... root['blob'] = blob
... transaction.commit()

>>> blob._p_activate()
>>> tids.append(blob._p_serial)

>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
>>> [ os.path.exists(x) for x in fns ] # no pack
[True, True, True, True, True]


Expand Down
48 changes: 11 additions & 37 deletions relstorage/tests/blob/blob_transaction.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
##############################################################################
#
# Copyright (c) 2005-2007 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

Transaction support for Blobs
=============================

Expand Down Expand Up @@ -314,7 +300,12 @@ We can also read committed data by calling open with a 'c' flag:

>>> f = blob.open('c')

This doesn't prevent us from opening the blob for writing:
This just returns a regular file object:

>>> type(f) == file_type
True

and doesn't prevent us from opening the blob for writing:

>>> with blob.open('w') as file:
... _ = file.write(b'x')
Expand Down Expand Up @@ -390,35 +381,23 @@ stored are discarded.
>>> blob_storage.tpc_begin(t)
>>> with open('blobfile', 'wb') as file:
... _ = file.write(b'This data should go away')
>>> s1 = blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
>>> blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
... '', t)
>>> new_oid = blob_storage.new_oid()
>>> with open('blobfile2', 'wb') as file:
... _ = file.write(b'This data should go away too')
>>> s2 = blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
>>> blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
... '', t)

>>> serials = blob_storage.tpc_vote(t)
>>> if s1 is None:
... s1 = [s for (oid, s) in serials if oid == blob._p_oid][0]
>>> if s2 is None:
... s2 = [s for (oid, s) in serials if oid == new_oid][0]

>>> bool(blob_storage.tpc_vote(t))
False
>>> blob_storage.tpc_abort(t)

Now, the serial for the existing blob should be the same:

>>> blob_storage.load(blob._p_oid, '') == (olddata, oldserial)
True

And we shouldn't be able to read the data that we saved:

>>> blob_storage.loadBlob(blob._p_oid, s1)
Traceback (most recent call last):
...
POSKeyError: 'No blob file...

Of course the old data should be unaffected:
The old data should be unaffected:

>>> with open(blob_storage.loadBlob(blob._p_oid, oldserial)) as fp:
... fp.read()
Expand All @@ -431,11 +410,6 @@ Similarly, the new object wasn't added to the storage:
...
POSKeyError: 0x...

>>> blob_storage.loadBlob(blob._p_oid, s2)
Traceback (most recent call last):
...
POSKeyError: 'No blob file...

.. clean up

>>> tm1.abort()
Expand Down
8 changes: 8 additions & 0 deletions relstorage/tests/blob/testblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,13 @@ def rmtree(path):
(re.compile(r'\%(sep)s' % dict(sep=os.path.sep)), '/'),
])

try:
file_type = file
except NameError:
# Py3: Python 3 does not have a file type.
import io
file_type = io.BufferedReader

def storage_reusable_suite(prefix, factory,
test_blob_storage_recovery=False,
test_packing=False,
Expand All @@ -652,6 +659,7 @@ def create_storage(name='data', blob_dir=None, **kw):
return factory(name, blob_dir, **kw)

test.globs['create_storage'] = create_storage
test.globs['file_type'] = file_type

suite = unittest.TestSuite()
suite.addTest(doctest.DocFileSuite(
Expand Down
Loading

0 comments on commit cb4bf47

Please sign in to comment.