Skip to content

Commit

Permalink
Use the local state to resolve conflicts instead of reading from the …
Browse files Browse the repository at this point in the history
…DB. Fixes #38.
  • Loading branch information
jamadden committed Jun 21, 2016
1 parent 5f6d36f commit 081f312
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
- MySQL: Use the "binary" character set to avoid producing "Invalid
utf8 character string" warnings. See `issue 57`_.

- Conflict resolution uses the locally cached state instead of
re-reading it from the database (they are guaranteed to be the
same). See `issue 38`_.

.. _`PR 18`: https://github.com/zodb/relstorage/pull/18/
.. _`PR 20`: https://github.com/zodb/relstorage/pull/20
.. _`PR 21`: https://github.com/zodb/relstorage/pull/21
Expand All @@ -66,6 +70,7 @@
.. _`PR 22`: https://github.com/zodb/relstorage/pull/22
.. _`PR 23`: https://github.com/zodb/relstorage/pull/23/
.. _`issue 57`: https://github.com/zodb/relstorage/issues/57
.. _`issue 38`: https://github.com/zodb/relstorage/issues/38

1.6.0b3 (2014-12-08)
--------------------
Expand Down
18 changes: 6 additions & 12 deletions relstorage/adapters/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,17 +718,15 @@ def postgresql_detect_conflict(self, cursor):
"""
if self.keep_history:
stmt = """
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid
FROM temp_store
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
LIMIT 1
"""
else:
stmt = """
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid
FROM temp_store
JOIN object_state ON (temp_store.zoid = object_state.zoid)
WHERE temp_store.prev_tid != object_state.tid
Expand All @@ -750,8 +748,7 @@ def mysql_detect_conflict(self, cursor):
# Lock in share mode to ensure the data being read is up to date.
if self.keep_history:
stmt = """
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid
FROM temp_store
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
Expand All @@ -760,8 +757,7 @@ def mysql_detect_conflict(self, cursor):
"""
else:
stmt = """
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid
FROM temp_store
JOIN object_state ON (temp_store.zoid = object_state.zoid)
WHERE temp_store.prev_tid != object_state.tid
Expand All @@ -782,16 +778,14 @@ def oracle_detect_conflict(self, cursor):
"""
if self.keep_history:
stmt = """
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid
FROM temp_store
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
"""
else:
stmt = """
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
temp_store.state
SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid
FROM temp_store
JOIN object_state ON (temp_store.zoid = object_state.zoid)
WHERE temp_store.prev_tid != object_state.tid
Expand Down
21 changes: 16 additions & 5 deletions relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ def store_temp(self, oid_int, state):
endpos = queue.tell()
self.queue_contents[oid_int] = (startpos, endpos)

def _read_temp_state(self, startpos, endpos):
self.queue.seek(startpos)
length = endpos - startpos
state = self.queue.read(length)
if len(state) != length:
raise AssertionError("Queued cache data is truncated")
return state, length

def read_temp(self, oid_int):
"""
Return the bytes for a previously stored temporary item.
"""
startpos, endpos = self.queue_contents[oid_int]
return self._read_temp_state(startpos, endpos)[0]

def send_queue(self, tid):
"""Now that this tid is known, send all queued objects to the cache"""
tid_int = u64(tid)
Expand All @@ -327,11 +342,7 @@ def send_queue(self, tid):
items.sort()

for startpos, endpos, oid_int in items:
self.queue.seek(startpos)
length = endpos - startpos
state = self.queue.read(length)
if len(state) != length:
raise AssertionError("Queued cache data is truncated")
state, length = self._read_temp_state(startpos, endpos)
cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
item_size = length + len(cachekey)
if send_size and send_size + item_size >= self.send_limit:
Expand Down
4 changes: 2 additions & 2 deletions relstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,8 @@ def _finish_store(self):
if conflict is None:
break

oid_int, prev_tid_int, serial_int, data = conflict
assert isinstance(data, bytes) # XXX PY3 porting
oid_int, prev_tid_int, serial_int = conflict
data = self.cache.read_temp(oid_int)
oid = p64(oid_int)
prev_tid = p64(prev_tid_int)
serial = p64(serial_int)
Expand Down

0 comments on commit 081f312

Please sign in to comment.