Skip to content

Commit

Permalink
Fix possible data corruption after FileStorage is truncated to roll b…
Browse files Browse the repository at this point in the history
…ack a transaction

Multi-threaded IO support, which is new to ZODB 3.10, allows clients to read
data (load & loadBefore) even after tpc_vote has started to write a new
transaction to disk. This is done by using different 'file' objects.

Issues start when a transaction is rolled back after data has been appended
(using the writing file object). Truncating is not enough because the FilePool
may have been used concurrently to read the end of the last transaction:
file objects have their own read buffers which, in this case, may also contain
the beginning of the aborted transaction.

So a solution is to invalidate read buffers whenever they may contain wrong
data. This patch does it on truncation, which happens rarely enough to not
affect performance.

We discovered this bug in the following conditions:
- ZODB splitted in several FileStorage
- many conflicts in the first committed DB, but always resolved
- unresolved conflict in another DB
If the transaction is replayed with success (no more conflict in the other DB),
a subsequent load of the object that could be resolved in the first DB may, for
example, return a wrong serial (tid of the aborted transaction) if the layout
of the committed transaction matches that of the aborted one.

The bug usually manifests with POSKeyError & CorruptedDataError exceptions in
ZEO logs, for example while trying to resolve a conflict (and restarting the
transaction does not help, causing Site Errors in Zope). But theorically,
this could also cause silent corruption or unpickling errors at client side.
  • Loading branch information
jmuchemb committed Apr 6, 2016
1 parent 6b70753 commit 5311b51
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/ZODB/FileStorage/FileStorage.py
Expand Up @@ -722,6 +722,7 @@ def tpc_vote(self, transaction):
# Hm, an error occurred writing out the data. Maybe the
# disk is full. We don't want any turd at the end.
self._file.truncate(self._pos)
self._files.flush()
raise
self._nextpos = self._pos + (tl + 8)

Expand Down Expand Up @@ -776,6 +777,7 @@ def _finish_finish(self, tid):
def _abort(self):
if self._nextpos:
self._file.truncate(self._pos)
self._files.flush()
self._nextpos=0
self._blob_tpc_abort()

Expand Down Expand Up @@ -2045,6 +2047,15 @@ def __init__(self, file_name):
self._out = []
self._cond = threading.Condition()

def flush(self):
"""Empty read buffers.
This is required if they contain data of rolled back transactions.
"""
with self.write_lock():
for f in self._files:
f.flush()

@contextlib.contextmanager
def write_lock(self):
with self._cond:
Expand Down
23 changes: 22 additions & 1 deletion src/ZODB/tests/testFileStorage.py
Expand Up @@ -25,6 +25,7 @@
from ZODB import POSException
from ZODB import DB
from ZODB.fsIndex import fsIndex
from ZODB.utils import U64, p64, z64

from ZODB.tests import StorageTestBase, BasicStorage, TransactionalUndoStorage
from ZODB.tests import PackableStorage, Synchronization, ConflictResolution
Expand Down Expand Up @@ -217,7 +218,6 @@ def checkCorruptionInPack(self):
# global.
import time

from ZODB.utils import U64, p64
from ZODB.FileStorage.format import CorruptedError
from ZODB.serialize import referencesf

Expand Down Expand Up @@ -285,6 +285,27 @@ def check_record_iternext(self):
else:
self.assertNotEqual(next_oid, None)

def checkFlushAfterTruncate(self, fail=False):
r0 = self._dostore(z64)
storage = self._storage
t = transaction.Transaction()
storage.tpc_begin(t)
storage.store(z64, r0, b'foo', b'', t)
storage.tpc_vote(t)
# Read operations are done with separate 'file' objects with their
# own buffers: here, the buffer also includes voted data.
storage.load(z64)
# This must invalidate all read buffers.
storage.tpc_abort(t)
self._dostore(z64, r0, b'bar', 1)
# In the case that read buffers were not invalidated, return value
# is based on what was cached during the first load.
self.assertEqual(storage.load(z64)[0], b'foo' if fail else b'bar')

def checkFlushNeededAfterTruncate(self):
self._storage._files.flush = lambda: None
self.checkFlushAfterTruncate(True)

class FileStorageHexTests(FileStorageTests):

def open(self, **kwargs):
Expand Down

0 comments on commit 5311b51

Please sign in to comment.