Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIx #95 #107

Merged
merged 3 commits into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ notifications:
install:
- pip install -U pip setuptools
- pip install -U tox coveralls
# Install patched version of 4; waiting on 4.4.3.
# Same change was merged to 5.0 branch post a5
- pip install git+https://github.com/zopefoundation/ZODB.git@4#egg=ZODB
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then pip install -U python-memcached; fi
- if [[ $TRAVIS_PYTHON_VERSION != 'pypy' ]]; then pip install -U pylibmc cffi; fi
- pip install -U -e ".[test]"
Expand Down
1 change: 0 additions & 1 deletion relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def _check_tid_after_load(self, oid_int, actual_tid_int,
# transaction order, leading it to sometimes put the
# wrong tid in delta_after*.
cp0, cp1 = self.checkpoints
import os

msg = ("Detected an inconsistency "
"between the RelStorage cache and the database "
Expand Down
42 changes: 27 additions & 15 deletions relstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def release(self):

@implementer(ZODB.interfaces.IStorage,
ZODB.interfaces.IMVCCStorage,
ZODB.interfaces.IMultiCommitStorage,
ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
Expand Down Expand Up @@ -177,6 +178,12 @@ class RelStorage(UndoLogCompatible,
# when the database connection is stale (due to async replication).
_stale_error = None

# OIDs resolved by undo()
_resolved = ()

# user, description, extension from transaction metadata.
_ude = None

def __init__(self, adapter, name=None, create=None,
options=None, cache=None, blobhelper=None,
# The top-level storage should use locks because
Expand Down Expand Up @@ -781,6 +788,7 @@ def tpc_begin(self, transaction, tid=None, status=' '):
self._lock.acquire()
self._clear_temp()
self._transaction = transaction
self._resolved = set()

user = _to_latin1(transaction.user)
desc = _to_latin1(transaction.description)
Expand Down Expand Up @@ -868,7 +876,7 @@ def _clear_temp(self):
def _finish_store(self):
"""Move stored objects from the temporary table to final storage.

Returns a list of (oid, tid) to be received by
Returns a sequence of OIDs that were resolved to be received by
Connection._handle_serial().
"""
assert self._tid is not None
Expand Down Expand Up @@ -909,21 +917,15 @@ def _finish_store(self):

# Move the new states into the permanent table
tid_int = u64(self._tid)
serials = []

if self.blobhelper is not None:
txn_has_blobs = self.blobhelper.txn_has_blobs
else:
txn_has_blobs = False
oid_ints = adapter.mover.move_from_temp(cursor, tid_int, txn_has_blobs)
for oid_int in oid_ints:
oid = p64(oid_int)
if oid in resolved:
serial = ConflictResolution.ResolvedSerial
else:
serial = self._tid
serials.append((oid, serial))

return serials

return resolved

@metricmethod
def tpc_vote(self, transaction):
Expand All @@ -933,7 +935,12 @@ def tpc_vote(self, transaction):
"tpc_vote called with wrong transaction")

try:
return self._vote()
resolved_by_vote = self._vote()
if self._resolved:
# self._resolved contains OIDs from undo()
self._resolved.update(resolved_by_vote)
return self._resolved
return resolved_by_vote
except:
if abort_early:
# abort early to avoid lockups while running the
Expand Down Expand Up @@ -978,15 +985,16 @@ def _vote(self):
raise ReadConflictError(
oid=oid, serials=(actual, expect))

serials = self._finish_store()
resolved_serials = self._finish_store()
self._adapter.mover.update_current(cursor, tid_int)
self._prepared_txn = self._adapter.txncontrol.commit_phase1(
conn, cursor, tid_int)

if self.blobhelper is not None:
self.blobhelper.vote(self._tid)

return serials
# New storage protocol
return resolved_serials

@metricmethod
def tpc_finish(self, transaction, f=None):
Expand All @@ -1000,6 +1008,7 @@ def tpc_finish(self, transaction, f=None):
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
return self._tid
finally:
self._clear_temp()
finally:
Expand Down Expand Up @@ -1216,7 +1225,10 @@ def undo(self, transaction_id, transaction):
if self.blobhelper is not None:
self.blobhelper.copy_undone(copied, self._tid)

return self._tid, oids
#return self._tid, oids
# new storage protocol
self._resolved.update(oids)
return None
finally:
adapter.locker.release_pack_lock(cursor)

Expand Down Expand Up @@ -1423,7 +1435,7 @@ def storeBlob(self, oid, serial, data, blobfilename, version, txn):
(or copy and remove it) immediately, or at transaction-commit
time. The file must not be open.

The new serial is returned.
Returns nothing.
"""
assert not version
with self._lock:
Expand Down
6 changes: 2 additions & 4 deletions relstorage/tests/RecoveryStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from transaction import Transaction
from ZODB import DB
from ZODB.serialize import referencesf
from ZODB.tests.StorageTestBase import handle_serials
from ZODB.tests.StorageTestBase import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.tests.StorageTestBase import zodb_pickle
Expand Down Expand Up @@ -215,10 +214,9 @@ def checkRestoreAfterDoubleCommit(self):
# Store an object
self._storage.store(oid, revid, data1, '', t)
# Store it again
r1 = self._storage.store(oid, revid, data2, '', t)
self._storage.store(oid, revid, data2, '', t)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
Expand Down
44 changes: 14 additions & 30 deletions relstorage/tests/blob/blob_packing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,25 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> import os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> with blob.open('w') as f: _ = f.write(b'this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 4')
>>> transaction.commit()
>>> for i in range(5):
... _ = transaction.begin()
... times.append(new_time())
... with blob.open('w') as file:
... _ = file.write(b'this is blob data ' + str(i).encode())
... if i:
... tids.append(blob._p_serial)
... else:
... root['blob'] = blob
... transaction.commit()

>>> blob._p_activate()
>>> tids.append(blob._p_serial)

>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
>>> [ os.path.exists(x) for x in fns ] # no pack
[True, True, True, True, True]

Do a pack to the slightly before the first revision was written:
Expand Down Expand Up @@ -125,4 +109,4 @@ revision as well as the entire directory:

Clean up our blob directory and database:

>>> blob_storage.close()
>>> database.close()
42 changes: 13 additions & 29 deletions relstorage/tests/blob/blob_packing_history_free.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,25 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> import os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> with blob.open('w') as f: _ = f.write(b'this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob._p_serial)

>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> with root['blob'].open('w') as f: _ = f.write(b'this is blob data 4')
>>> transaction.commit()
>>> for i in range(5):
... _ = transaction.begin()
... times.append(new_time())
... with blob.open('w') as file:
... _ = file.write(b'this is blob data ' + str(i).encode())
... if i:
... tids.append(blob._p_serial)
... else:
... root['blob'] = blob
... transaction.commit()

>>> blob._p_activate()
>>> tids.append(blob._p_serial)

>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
>>> [ os.path.exists(x) for x in fns ] # no pack
[True, True, True, True, True]


Expand Down
48 changes: 11 additions & 37 deletions relstorage/tests/blob/blob_transaction.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
##############################################################################
#
# Copyright (c) 2005-2007 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

Transaction support for Blobs
=============================

Expand Down Expand Up @@ -314,7 +300,12 @@ We can also read committed data by calling open with a 'c' flag:

>>> f = blob.open('c')

This doesn't prevent us from opening the blob for writing:
This just returns a regular file object:

>>> type(f) == file_type
True

and doesn't prevent us from opening the blob for writing:

>>> with blob.open('w') as file:
... _ = file.write(b'x')
Expand Down Expand Up @@ -390,35 +381,23 @@ stored are discarded.
>>> blob_storage.tpc_begin(t)
>>> with open('blobfile', 'wb') as file:
... _ = file.write(b'This data should go away')
>>> s1 = blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
>>> blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
... '', t)
>>> new_oid = blob_storage.new_oid()
>>> with open('blobfile2', 'wb') as file:
... _ = file.write(b'This data should go away too')
>>> s2 = blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
>>> blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
... '', t)

>>> serials = blob_storage.tpc_vote(t)
>>> if s1 is None:
... s1 = [s for (oid, s) in serials if oid == blob._p_oid][0]
>>> if s2 is None:
... s2 = [s for (oid, s) in serials if oid == new_oid][0]

>>> bool(blob_storage.tpc_vote(t))
False
>>> blob_storage.tpc_abort(t)

Now, the serial for the existing blob should be the same:

>>> blob_storage.load(blob._p_oid, '') == (olddata, oldserial)
True

And we shouldn't be able to read the data that we saved:

>>> blob_storage.loadBlob(blob._p_oid, s1)
Traceback (most recent call last):
...
POSKeyError: 'No blob file...

Of course the old data should be unaffected:
The old data should be unaffected:

>>> with open(blob_storage.loadBlob(blob._p_oid, oldserial)) as fp:
... fp.read()
Expand All @@ -431,11 +410,6 @@ Similarly, the new object wasn't added to the storage:
...
POSKeyError: 0x...

>>> blob_storage.loadBlob(blob._p_oid, s2)
Traceback (most recent call last):
...
POSKeyError: 'No blob file...

.. clean up

>>> tm1.abort()
Expand Down
8 changes: 8 additions & 0 deletions relstorage/tests/blob/testblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,13 @@ def rmtree(path):
(re.compile(r'\%(sep)s' % dict(sep=os.path.sep)), '/'),
])

try:
file_type = file
except NameError:
# Py3: Python 3 does not have a file type.
import io
file_type = io.BufferedReader

def storage_reusable_suite(prefix, factory,
test_blob_storage_recovery=False,
test_packing=False,
Expand All @@ -652,6 +659,7 @@ def create_storage(name='data', blob_dir=None, **kw):
return factory(name, blob_dir, **kw)

test.globs['create_storage'] = create_storage
test.globs['file_type'] = file_type

suite = unittest.TestSuite()
suite.addTest(doctest.DocFileSuite(
Expand Down
Loading