Skip to content

Commit

Permalink
Fix deleteObject in history-preserving mode to conform to ZODB interface
Browse files Browse the repository at this point in the history
ZODB specifies deleteObject to create new revision that indicates object
removal:

    def deleteObject(oid, serial, transaction):
        """Mark an object as deleted

        This method marks an object as deleted VIA A NEW OBJECT
        REVISION.  Subsequent attempts to load current data for the
        object will fail with a POSKeyError, but loads for
        non-current data will succeed if there are previous
        non-delete records.  The object will be removed from the
        storage when all not-delete records are removed.

https://github.com/zopefoundation/ZODB/blob/bc13ca74/src/ZODB/interfaces.py#L1292-L1307
(emphasis mine)

However currently for history-preserving mode, as explained in
zopefoundation/ZODB#318 (comment),
RelStorage purges latest object revision instead of creating new one with
whiteout indication. This goes against deleteObject specification and, as
demonstrated by attached test program, against particular FileStorage
behaviour.

-> Fix it.

P.S. I'm complete RelStorage newbie and looked only briefly. It could be that
my patch is e.g. incomplete, or not optimal. However it demonstrates a real
problem, and it fixes both adjusted testcase and failure of attached tdelete.py

P.P.S. Tested only with SQLite backend.

    ---- 8< ---- (tdelete.py)
    #!/usr/bin/env python
    """tdelete.py demonstrates that deleteObject should create new whiteout
    record, and that older data records should be still accessible.

    e.g. with FileStorage:

       $ ./tdelete.py file://1.db
       @03e40964a0766f33 (= 280359404597309235) obj<0000000000000001>  ->  int(0)
       @03e40964a0790944 (= 280359404597479748) obj<0000000000000001>  ->  int(1)

       --------

       @03e40964a0766f33  obj<0000000000000001>  ->  int(0)    # must be int(0)
       @03e40964a0790944  obj<0000000000000001>  ->  int(1)    # must be int(1)

    However it currently fails with RelStorage, because deleteObject does not
    create new whiteout revision and instead purges already committed data:

        $ rm x/*; ./tdelete.py sqlite://?data_dir=`pwd`/x
        @03e40972d5408022 (= 280359465612509218) obj<0000000000000001>  ->  int(0)
        @03e40972d541ddee (= 280359465612598766) obj<0000000000000001>  ->  int(1)

        --------

        @03e40972d5408022  obj<0000000000000001>  ->  int(0)    # must be int(0)
        Traceback (most recent call last):
          File "./tdelete.py", line 84, in <module>
            main()
          File "./tdelete.py", line 80, in main
            dumpObjAt(at1, "must be int(1)")
          File "./tdelete.py", line 75, in dumpObjAt
            obj = conn.get(oid)
          File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/Connection.py", line 238, in get
            obj = self._reader.getGhost(p)
          File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/serialize.py", line 598, in getGhost
            unpickler = self._get_unpickler(pickle)
          File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/serialize.py", line 478, in _get_unpickler
            file = BytesIO(pickle)
        TypeError: StringIO() argument 1 must be string or buffer, not None
    """

    from __future__ import print_function

    import zodburi
    from persistent import Persistent
    from ZODB.DB import DB
    from ZODB.Connection import TransactionMetaData
    from ZODB.utils import u64
    import transaction
    import sys

    class PInt(Persistent):
        def __init__(self, i):
            self.i = i
        def __str__(self):
            return "int(%d)" % self.i

    def h(tid):
        return tid.encode('hex')

    def dump(obj):
        print("@%s (= %d) obj<%s>  ->  %s" % (h(obj._p_serial), u64(obj._p_serial), h(obj._p_oid), obj))

    def main():
        zurl = sys.argv[1]
        zstoropen, dbkw = zodburi.resolve_uri(zurl)

        stor = zstoropen()
        db = DB(stor, **dbkw)

        conn = db.open()
        root = conn.root()

        root['X'] = obj = PInt(0)
        transaction.commit()
        dump(obj)
        at0 = obj._p_serial
        oid = obj._p_oid

        obj.i += 1
        transaction.commit()
        dump(obj)
        at1 = obj._p_serial

        txn_meta = TransactionMetaData()
        stor.tpc_begin(txn_meta)
        stor.deleteObject(oid, at1, txn_meta)
        stor.tpc_vote(txn_meta)
        stor.tpc_finish(txn_meta)

        print('\n--------\n')

        def dumpObjAt(at, comment):
            conn = db.open(at=at)
            obj = conn.get(oid)
            print("@%s  obj<%s>  ->  %s\t# %s" % (h(at), h(oid), obj, comment))
            conn.close()

        dumpObjAt(at0, "must be int(0)")
        dumpObjAt(at1, "must be int(1)")

    if __name__ == '__main__':
        main()

P.P.P.S. SQLite URI resolver is currently broken after 08259fa (Finer
control over sqlite storage locking, oid allocation and stats). I've used the
following local patch as a workaround:

    --- a/src/relstorage/zodburi_resolver.py
    +++ b/src/relstorage/zodburi_resolver.py
    @@ -121,14 +121,14 @@ def factory(options):
             return factory, unused

     class SqliteAdapterHelper(Resolver):
    -    _string_args = ('path',)
    +    _string_args = ('data_dir',)

         def __call__(self, parsed_uri, kw):
             kw, unused = self.interpret_kwargs(kw)

             def factory(options):
                 from relstorage.adapters.sqlite.adapter import Sqlite3Adapter
    -            return Sqlite3Adapter(options=options, **kw)
    +            return Sqlite3Adapter(options=options, pragmas={}, **kw)
             return factory, unused

     # The relstorage support is inspired by django-zodb.
  • Loading branch information
navytux committed Nov 9, 2021
1 parent f3825ba commit 26e5bea
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 66 deletions.
9 changes: 5 additions & 4 deletions src/relstorage/adapters/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ def pack(pack_tid, packed_func=None):
accept two parameters, oid and tid (64 bit integers).
"""

def deleteObject(cursor, oid_int, tid_int):
def deleteObject(shared_state, oid_int, tid_int):
"""
Delete the revision of *oid_int* in transaction *tid_int*.
Expand All @@ -1042,9 +1042,10 @@ def deleteObject(cursor, oid_int, tid_int):
is at *tid_int*), leading all access to *oid_int* in the
future to throw ``POSKeyError``.
In history preserving databases, this means to set the state for the object
at the transaction to NULL, signifying that it's been deleted. A subsequent
pack operation is required to actually remove these deleted items.
In history preserving databases, this means to create new object
revision with state=NULL , signifying that the object has been deleted.
A subsequent pack operation is required to actually remove these
deleted items.
"""

class IPoller(Interface):
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def store_temps(self, cursor, state_oid_tid_iter):
query.executemany(
cursor,
(
(oid_int, tid_int, do_md5(data), Binary(data))
(oid_int, tid_int, do_md5(data), Binary(data) if data is not None else None)
for (data, oid_int, tid_int)
in state_oid_tid_iter
)
Expand Down
46 changes: 20 additions & 26 deletions src/relstorage/adapters/packundo.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,12 @@ def check_refs(self, pack_tid):
INNER JOIN object_ref USING (zoid)
WHERE keep = %(TRUE)s
AND NOT EXISTS (
SELECT 1
FROM object_state
WHERE object_state.zoid = to_zoid
AND object_state.state IS NOT NULL
SELECT state FROM (
SELECT state, MAX(tid)
FROM object_state
WHERE object_state.zoid = to_zoid
)
WHERE state IS NOT NULL
)
"""
self.runner.run_script_stmt(ss_load_cursor, stmt)
Expand Down Expand Up @@ -344,20 +346,6 @@ def check_refs(self, pack_tid):
# represents a deleted object; it shouldn't be reachable anyway
# and will be packed away next time we pack (without GC))

# We shouldn't *have* to verify the oldserial in the delete statement,
# because our only consumer is zc.zodbdgc which only calls us for
# unreachable objects, so they shouldn't be modified and get a new
# TID. But it's safer to do so.
_script_delete_object = None

def deleteObject(self, cursor, oid, oldserial):
params = {'oid': u64(oid), 'tid': u64(oldserial)}
self.runner.run_script_stmt(
cursor,
self._script_delete_object,
params)
return cursor.rowcount

def on_filling_object_refs_added(self, oids=None, tids=None):
"""Test injection point for packing."""

Expand Down Expand Up @@ -475,14 +463,12 @@ class HistoryPreservingPackUndo(PackUndo):
).limit(delete_empty_transactions_batch_size))
)

_script_delete_object = """
UPDATE object_state
SET state = NULL,
state_size = 0,
md5 = ''
WHERE zoid = %(oid)s
AND tid = %(tid)s
"""
def deleteObject(self, shared_state, oid, oldserial):
oid_int = u64(oid)
oldserial_int = u64(oldserial)
shared_state.temp_storage.store_temp(oid_int, None, oldserial_int)
# can't count how many objects are deleted in history-preserving mode
return None

_is_packed_tx_query = Schema.transaction.select(
1
Expand Down Expand Up @@ -1186,6 +1172,14 @@ class HistoryFreePackUndo(PackUndo):
WHERE zoid = %(oid)s
and tid = %(tid)s
"""
def deleteObject(self, shared_state, oid, oldserial):
cursor = shared_state.store_connection.cursor
params = {'oid': u64(oid), 'tid': u64(oldserial)}
self.runner.run_script_stmt(
cursor,
self._script_delete_object,
params)
return cursor.rowcount

def on_fill_object_ref_batch(self, oid_batch, num_refs_found):
"""Hook for testing."""
Expand Down
5 changes: 2 additions & 3 deletions src/relstorage/storage/tpc/begin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _invalidated_oids(self, *oid_bytes):

def deleteObject(self, oid, oldserial, transaction):
"""
This method operates directly against the ``object_state`` table;
In history-free mode this method operates directly against the ``object_state`` table;
as such, it immediately takes out locks on that table.
This method is only expected to be called when performing
Expand Down Expand Up @@ -216,12 +216,11 @@ def deleteObject(self, oid, oldserial, transaction):

# We delegate the actual operation to the adapter's packundo,
# just like native pack
cursor = self.shared_state.store_connection.cursor
# When this is done, we get a tpc_vote,
# and a tpc_finish.
# The interface doesn't specify a return value, so for testing
# we return the count of rows deleted (should be 1 if successful)
deleted = self.shared_state.adapter.packundo.deleteObject(cursor, oid, oldserial)
deleted = self.shared_state.adapter.packundo.deleteObject(self.shared_state, oid, oldserial)
self._invalidated_oids(oid)
return deleted

Expand Down
14 changes: 11 additions & 3 deletions src/relstorage/storage/tpc/temporary_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ def store_temp(self, oid_int, state, prev_tid_int=0):
queue = self._queue
queue.seek(0, 2) # seek to end
startpos = queue.tell()
queue.write(state)
endpos = queue.tell()
if state is not None:
queue.write(state)
endpos = queue.tell()
else:
endpos = None
self._queue_contents[oid_int] = (startpos, endpos, prev_tid_int)

def __len__(self):
Expand Down Expand Up @@ -88,6 +91,8 @@ def read_temp(self, oid_int):
Return the bytes for a previously stored temporary item.
"""
startpos, endpos, _ = self._queue_contents[oid_int]
if endpos is None:
return None
return self._read_temp_state(startpos, endpos)

def __iter__(self):
Expand All @@ -96,7 +101,10 @@ def __iter__(self):
def iter_for_oids(self, oids):
read_temp_state = self._read_temp_state
for startpos, endpos, oid_int, prev_tid_int in self.items(oids):
state = read_temp_state(startpos, endpos)
if endpos is None:
state = None
else:
state = read_temp_state(startpos, endpos)
yield state, oid_int, prev_tid_int

def items(self, oids=None):
Expand Down
28 changes: 5 additions & 23 deletions src/relstorage/storage/tpc/vote.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,29 +630,11 @@ class HistoryPreservingDeleteOnly(HistoryPreserving):
__slots__ = ()

def _vote(self, storage):
if self.shared_state.temp_storage and self.shared_state.temp_storage.stored_oids:
raise StorageTransactionError("Cannot store and delete at the same time.")
# We only get here if we've deleted objects, meaning we hold their row locks.
# We only delete objects once we hold the commit lock.
assert self.committing_tid_lock
# Holding the commit lock put an entry in the transaction table,
# but we don't want to bump the TID or store that data.
self.shared_state.adapter.txncontrol.delete_transaction(
self.shared_state.store_connection.cursor,
self.committing_tid_lock.tid_int
)
self.lock_and_vote_times[0] = time.time()
return ()

def _lock_and_move(self, vote_only=False):
# We don't do the final commit,
# we just prepare.
self._enter_critical_phase_until_transaction_end()
self.shared_state.prepared_txn = self.shared_state.adapter.txncontrol.commit_phase1(
self.shared_state.store_connection,
self.committing_tid_lock.tid_int
)
return False
if self.shared_state.temp_storage:
for (state, _, _) in self.shared_state.temp_storage:
if state is not None:
raise StorageTransactionError("Cannot store and delete at the same time.")
return super(HistoryPreservingDeleteOnly, self)._vote(storage)


class _CachedConflictResolver(ConflictResolvingStorage):
Expand Down
10 changes: 4 additions & 6 deletions src/relstorage/tests/hptestbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,11 @@ def checkImplementsIExternalGC(self):
history = storage.history(obj_oid, size=100)
self.assertEqual(6, len(history))
latest_tid = history[0]['tid']
# We can delete the latest TID for the OID, and the whole
# We can record deletion for the OID, and the whole
# object goes away on a pack.
t = TransactionMetaData()
storage.tpc_begin(t)
count = storage.deleteObject(obj_oid, latest_tid, t)
self.assertEqual(count, 1)
# Doing it again will do nothing because it's already
# gone.
count = storage.deleteObject(obj_oid, latest_tid, t)
storage.deleteObject(obj_oid, latest_tid, t)
invalidations = storage.tpc_vote(t)
storage.tpc_finish(t)

Expand All @@ -424,6 +420,8 @@ def checkImplementsIExternalGC(self):
storage.load(obj_oid)

# But we can load a state before then.
state = storage.loadSerial(obj_oid, history[0]['tid'])
self.assertEqual(len(state), history[0]['size'])
state = storage.loadSerial(obj_oid, history[1]['tid'])
self.assertEqual(len(state), history[1]['size'])

Expand Down

0 comments on commit 26e5bea

Please sign in to comment.