Skip to content
This repository has been archived by the owner on May 13, 2020. It is now read-only.

Commit

Permalink
- Feature: Conflicts are now detected while aborting a transaction. The
Browse files Browse the repository at this point in the history
  implemented policy will not reset the document state, if a conflict is
  detected.

- Feature: Provide a flag to turn on MongoDB access logging. The flag is false
  by default, since access logging is very expensive.

- Bug: We have seen several occasions in production where we suddenly lost
  some state in some documents, which prohibited the objects from being
  loadable again. The cause was that the ``_original_states`` attribute did not
  store the raw MongoDB document, but a modified one. Since those states are
  used during abort to reset the state, however, the modified document got
  stored making the affected objects inaccessible.
  • Loading branch information
strichter committed Mar 29, 2012
1 parent 1d496f9 commit 1277b2c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 22 deletions.
14 changes: 14 additions & 0 deletions CHANGES.txt
Expand Up @@ -5,6 +5,20 @@ CHANGES
0.7.0 (2012-03-??)
------------------

- Feature: Conflicts are now detected while aborting a transaction. The
implemented policy will not reset the document state, if a conflict is
detected.

- Feature: Provide a flag to turn on MongoDB access logging. The flag is false
by default, since access logging is very expensive.

- Bug: We have seen several occasions in production where we suddenly lost
some state in some documents, which prohibited the objects from being
loadable again. The cause was that the ``_original_states`` attribute did not
store the raw MongoDB document, but a modified one. Since those states are
used during abort to reset the state, however, the modified document got
stored making the affected objects inaccessible.

- Bug: When a transaction was aborted, the states of all *loaded* objects were
reset. Now, only *modified* object states are reset. This should drastically
lower problems (by the ratio of read over modified objects) due to lack of
Expand Down
43 changes: 27 additions & 16 deletions src/mongopersist/datamanager.py
Expand Up @@ -26,6 +26,7 @@
from zope.exceptions import exceptionformatter
from mongopersist import interfaces, serialize

MONGO_ACCESS_LOGGING = False
COLLECTION_LOG = logging.getLogger('mongopersist.collection')

def create_conflict_error(obj, new_doc):
Expand Down Expand Up @@ -76,15 +77,15 @@ def __call__(self, *args, **kwargs):
class LoggingDecorator(object):

# these are here to be easily patched
ADDTB = True
ADD_TB = True
TB_LIMIT = 10 # 10 should be sufficient to figure

def __init__(self, collection, function):
self.collection = collection
self.function = function

def __call__(self, *args, **kwargs):
if self.ADDTB:
if self.ADD_TB:
try:
raise ValueError('boom')
except:
Expand Down Expand Up @@ -116,7 +117,7 @@ def __init__(self, collection, datamanager):

def __getattr__(self, name):
attr = getattr(self.collection, name)
if name in self.LOGGED_METHODS:
if MONGO_ACCESS_LOGGING and name in self.LOGGED_METHODS:
attr = LoggingDecorator(self.collection, attr)
if name in self.QUERY_METHODS:
attr = FlushDecorator(self._datamanager, attr)
Expand Down Expand Up @@ -210,22 +211,29 @@ def _get_collection_from_object(self, obj):
db_name, coll_name = self._writer.get_collection_name(obj)
return self._get_collection(db_name, coll_name)

def _check_conflict(self, obj, can_raise=True):
# This object is not even added to the database yet, so there
# cannot be a conflict.
if obj._p_oid is None:
return None if can_raise else False
coll = self._get_collection_from_object(obj)
new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
if new_doc is None:
return None if can_raise else False
if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
if can_raise:
raise self.conflict_error_factory(obj, new_doc)
else:
return True
return None if can_raise else False

def _check_conflicts(self):
if not self.detect_conflicts:
return
# Check each modified object to see whether Mongo has a new version of
# the object.
for obj in self._registered_objects:
# This object is not even added to the database yet, so there
# cannot be a conflict.
if obj._p_oid is None:
continue
coll = self._get_collection_from_object(obj)
new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
if new_doc is None:
continue
if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
raise self.conflict_error_factory(obj, new_doc)
self._check_conflict(obj)

def _flush_objects(self):
# Now write every registered object, but make sure we write each
Expand Down Expand Up @@ -349,10 +357,13 @@ def abort(self, transaction):
# the tests abort transactions often without having loaded
# objects through proper channels.
continue
if (self.detect_conflicts and
self._check_conflict(obj, can_raise=False)):
# If we have a conflict, we are not going to reset to the
# original state. (This is a policy that should be made
# pluggable.)
continue
coll = self.get_collection(db_ref.database, db_ref.collection)
# XXX: There should be a check here whether the state has been
# modified in the mean time by another transaction. Then a policy
# needs to decide what to do.
coll.update({'_id': db_ref.id}, state, True)
self.reset()

Expand Down
10 changes: 6 additions & 4 deletions src/mongopersist/serialize.py
Expand Up @@ -384,14 +384,16 @@ def set_ghost_state(self, obj, doc=None):
coll = self._jar.get_collection(
obj._p_oid.database, obj._p_oid.collection)
doc = coll.find_one({'_id': obj._p_oid.id})
# Create a copy of the doc, so that we can modify it.
state_doc = doc.copy()
# Remove unwanted attributes.
doc.pop('_id')
doc.pop('_py_persistent_type', None)
state_doc.pop('_id')
state_doc.pop('_py_persistent_type', None)
# Store the serial, if conflict detection is enabled.
if self._jar.detect_conflicts:
obj._p_serial = p64(doc.pop('_py_serial', 0))
obj._p_serial = p64(state_doc.pop('_py_serial', 0))
# Now convert the document to a proper Python state dict.
state = dict(self.get_object(doc, obj))
state = dict(self.get_object(state_doc, obj))
# Now store the original state. It is assumed that the state dict is
# not modified later.
self._jar._original_states[obj._p_oid] = doc
Expand Down
64 changes: 64 additions & 0 deletions src/mongopersist/tests/test_datamanager.py
Expand Up @@ -338,6 +338,25 @@ def doctest_MongoDataManager_insert():
{u'_id': ObjectId('4f5c443837a08e37bf000001'), u'name': u'Foo 2'})
"""

def doctest_MongoDataManager_insert_conflict_detection():
r"""MongoDataManager: insert(obj): Conflict Detection.
This test ensures that if the datamanager has conflict detection turned
on, all the needed helper fields are written.
>>> dm.detect_conflicts = True
>>> foo = Foo('foo')
>>> foo_ref = dm.insert(foo)
Let's check that all the fields are there:
>>> coll = dm.get_collection_from_object(foo)
>>> coll.find_one({})
{u'_id': ObjectId('4f74837237a08e186f000000'), u'_py_serial': 1,
u'name': u'foo'}
"""


def doctest_MongoDataManager_remove():
r"""MongoDataManager: remove(obj)
Expand Down Expand Up @@ -581,6 +600,51 @@ def doctest_MongoDataManager_abort_modified_only():
"""

def doctest_MongoDataManager_abort_conflict_detection():
r"""MongoDataManager: abort(): Conflict detections while aborting.
When a transaction is aborting, we are usually resetting the state of the
modified objects. What happens, however, when the document was updated
since the last flush?
The implemented policy now does not reset the state in this case.
First let's create an initial state:
>>> dm.detect_conflicts = True
>>> dm.reset()
>>> foo_ref = dm.insert(Foo('one'))
>>> dm.reset()
>>> coll = dm._get_collection_from_object(Foo())
1. Transaction A loads the object and modifies it:
>>> foo_A = dm.load(foo_ref)
>>> foo_A.name = u'1'
>>> coll.find_one({})
{u'_id': ObjectId('4e7dd'), u'_py_serial': 1, u'name': u'one'}
2. Transaction B comes along and modifies the object as well and commits:
>>> dm_B = datamanager.MongoDataManager(
... conn, detect_conflicts=True,
... default_database=DBNAME, root_database=DBNAME)
>>> foo_B = dm_B.load(foo_ref)
>>> foo_B.name = 'Eins'
>>> dm_B.tpc_finish(None)
>>> coll.find_one({})
{u'_id': ObjectId('4e7dd'), u'_py_serial': 2, u'name': u'Eins'}
3. If transcation A is later aborted, it does not reset the state, since
it changed:
>>> dm.abort(None)
>>> coll.find_one({})
{u'_id': ObjectId('4e7dd'), u'_py_serial': 2, u'name': u'Eins'}
"""

def doctest_MongoDataManager_tpc_begin():
r"""MongoDataManager: tpc_begin()
Expand Down
10 changes: 8 additions & 2 deletions src/mongopersist/tests/test_serialize.py
Expand Up @@ -668,8 +668,14 @@ def doctest_ObjectReader_set_ghost_state():
Note that the original state is stored in the data manager:
>>> gobj._p_jar._original_states
{DBRef('Top', ObjectId('4f5bf4e437a08e2614000001'), 'mongopersist_test'):
{u'name': u'top'}}
{DBRef('Top', ObjectId('4f7487e237a08e1a86000001'), 'mongopersist_test'):
{u'_id': ObjectId('4f7487e237a08e1a86000001'),
u'_py_serial': 1,
u'name': u'top'}}
Note that it is important that the fully returned Mongo document is stored
here, since this document is taken and put back into Mongo when a
transaction is not committed.
This state does not change, even when the object is modified:
Expand Down

0 comments on commit 1277b2c

Please sign in to comment.