Skip to content
This repository has been archived by the owner on May 13, 2020. It is now read-only.

Commit

Permalink
- Feature: A new IConflictHandler interface now controls all aspe…
Browse files Browse the repository at this point in the history
…cts of

  conflict resolution. The following implementations are provided:

  * ``NoCheckConflictHandler``: This handler does nothing and when used, the
    system behaves as before when the ``detect_conflicts`` flag was set to
    ``False``.

  * ``SimpleSerialConflictHandler``: This handler uses serial numbers on each
    document to keep track of versions and then to detect conflicts. When a
    conflict is detected, a ``ConflictError`` is raised. This handler is
    identical to ``detect_conflicts`` being set to ``True``.

  * ``ResolvingSerialConflictHandler``: Another serial handler, but it has the
    ability to resolve a conflict. For this to happen, a persistent object
    must implement ``_p_resolveConflict(orig_state, cur_state, new_state)``,
    which returns the new, merged state.

  As a result, the ``detect_conflicts`` flag of the data manager was removed
  and replaced with the ``conflict_handler`` attribute. One can pass in the
  ``conflict_handler_factory`` to the data manager constructor. The factory
  needs to expect on argument, the data manager.
  • Loading branch information
strichter committed Mar 29, 2012
1 parent f24e20a commit 62100a1
Show file tree
Hide file tree
Showing 9 changed files with 592 additions and 96 deletions.
22 changes: 22 additions & 0 deletions CHANGES.txt
Expand Up @@ -5,6 +5,28 @@ CHANGES
0.7.0 (2012-03-??)
------------------

- Feature: A new ``IConflictHandler`` interface now controls all aspects of
conflict resolution. The following implementations are provided:

* ``NoCheckConflictHandler``: This handler does nothing and when used, the
system behaves as before when the ``detect_conflicts`` flag was set to
``False``.

* ``SimpleSerialConflictHandler``: This handler uses serial numbers on each
document to keep track of versions and then to detect conflicts. When a
conflict is detected, a ``ConflictError`` is raised. This handler is
identical to ``detect_conflicts`` being set to ``True``.

* ``ResolvingSerialConflictHandler``: Another serial handler, but it has the
ability to resolve a conflict. For this to happen, a persistent object
must implement ``_p_resolveConflict(orig_state, cur_state, new_state)``,
which returns the new, merged state.

As a result, the ``detect_conflicts`` flag of the data manager was removed
and replaced with the ``conflict_handler`` attribute. One can pass in the
``conflict_handler_factory`` to the data manager constructor. The factory
needs to expect on argument, the data manager.

- Feature: ``ConflictError`` has now a much more meaningful API. Instead of
just referencing the object and different serials, it now actual has the
original, current and new state documents.
Expand Down
4 changes: 2 additions & 2 deletions src/mongopersist/README.txt
Expand Up @@ -526,13 +526,13 @@ implemented using a serial number on the document.
Let's reset the database and create a data manager with enabled conflict
detection:

>>> from mongopersist import datamanager
>>> from mongopersist import conflict, datamanager
>>> conn.drop_database(DBNAME)
>>> dm2 = datamanager.MongoDataManager(
... conn,
... default_database=DBNAME,
... root_database=DBNAME,
... detect_conflicts=True)
... conflict_handler_factory=conflict.SimpleSerialConflictHandler)

Now we add a person and see that the serial got stored.

Expand Down
127 changes: 127 additions & 0 deletions src/mongopersist/conflict.py
@@ -0,0 +1,127 @@
##############################################################################
#
# Copyright (c) 2012 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Mongo Persistence Conflict Handler Implementations"""
from __future__ import absolute_import
import struct
import zope.interface
from mongopersist import interfaces, serialize

def p64(v):
"""Pack an integer or long into a 8-byte string"""
return struct.pack(">Q", v)

def u64(v):
"""Unpack an 8-byte string into a 64-bit long integer."""
return struct.unpack(">Q", v)[0]

def create_conflict_error(obj, orig_doc, cur_doc, new_doc):
return interfaces.ConflictError(None, obj, orig_doc, cur_doc, new_doc)

class NoCheckConflictHandler(object):
zope.interface.implements(interfaces.IConflictHandler)

def __init__(self, datamanager):
self.datamanager = datamanager

def on_before_set_state(self, obj, state):
pass

def on_before_store(self, obj, state):
pass

def on_after_store(self, obj, state):
pass

def on_modified(self, obj):
pass

def has_conflicts(self, objs):
return False

def check_conflicts(self, objs):
pass


class SerialConflictHandler(object):
zope.interface.implements(interfaces.IResolvingConflictHandler)

field_name = '_py_serial'
conflict_error_factory = staticmethod(create_conflict_error)

def __init__(self, datamanager):
self.datamanager = datamanager

def on_before_set_state(self, obj, state):
obj._p_serial = p64(state.pop(self.field_name, 0))

def on_before_store(self, obj, state):
state[self.field_name] = u64(getattr(obj, '_p_serial', 0)) + 1
obj._p_serial = p64(state[self.field_name])

def on_after_store(self, obj, state):
pass

def on_modified(self, obj):
pass

def resolve(self, obj, orig_doc, cur_doc, new_doc):
raise NotImplementedError

def check_conflict(self, obj):
# This object is not even added to the database yet, so there
# cannot be a conflict.
if obj._p_oid is None:
return
coll = self.datamanager._get_collection_from_object(obj)
cur_doc = coll.find_one(obj._p_oid.id, fields=(self.field_name,))
if cur_doc is None:
return
if cur_doc.get(self.field_name, 0) != u64(obj._p_serial):
orig_doc = self.datamanager._original_states.get(obj._p_oid)
cur_doc = coll.find_one(obj._p_oid.id)
new_doc = self.datamanager._writer.get_full_state(obj)
resolved = self.resolve(obj, orig_doc, cur_doc, new_doc)
if not resolved:
return self.conflict_error_factory(
obj, orig_doc, cur_doc, new_doc)

def has_conflicts(self, objs):
for obj in objs:
if self.check_conflict(obj) is not None:
return True
return False

def check_conflicts(self, objs):
for obj in objs:
err = self.check_conflict(obj)
if err is not None:
raise err


class SimpleSerialConflictHandler(SerialConflictHandler):

def resolve(self, obj, orig_doc, cur_doc, new_doc):
return False


class ResolvingSerialConflictHandler(SerialConflictHandler):

def resolve(self, obj, orig_doc, cur_doc, new_doc):
if hasattr(obj, '_p_resolveConflict'):
doc = obj._p_resolveConflict(orig_doc, cur_doc, new_doc)
if doc is not None:
doc[self.field_name] = cur_doc[self.field_name]
self.datamanager._reader.set_ghost_state(obj, doc)
return True
return False
63 changes: 16 additions & 47 deletions src/mongopersist/datamanager.py
Expand Up @@ -24,16 +24,13 @@
import zope.interface

from zope.exceptions import exceptionformatter
from mongopersist import interfaces, serialize
from mongopersist import conflict, interfaces, serialize

MONGO_ACCESS_LOGGING = False
COLLECTION_LOG = logging.getLogger('mongopersist.collection')

LOG = logging.getLogger(__name__)

def create_conflict_error(obj, orig_doc, cur_doc, new_doc):
return interfaces.ConflictError(None, obj, orig_doc, cur_doc, new_doc)

def process_spec(collection, spec):
try:
adapter = interfaces.IMongoSpecProcessor(None)
Expand Down Expand Up @@ -173,14 +170,14 @@ def keys(self):
class MongoDataManager(object):
zope.interface.implements(interfaces.IMongoDataManager)

detect_conflicts = False
default_database = 'mongopersist'
name_map_collection = 'persistence_name_map'
conflict_error_factory = staticmethod(create_conflict_error)
conflict_handler = None

def __init__(self, conn, detect_conflicts=None, default_database=None,
def __init__(self, conn, default_database=None,
root_database=None, root_collection=None,
name_map_collection=None, conflict_error_factory=None):
name_map_collection=None,
conflict_handler_factory=conflict.NoCheckConflictHandler):
self._conn = conn
self._reader = serialize.ObjectReader(self)
self._writer = serialize.ObjectWriter(self)
Expand All @@ -193,14 +190,12 @@ def __init__(self, conn, detect_conflicts=None, default_database=None,
self._needs_to_join = True
self._object_cache = {}
self.annotations = {}
if detect_conflicts is not None:
self.detect_conflicts = detect_conflicts
if self.conflict_handler is None:
self.conflict_handler = conflict_handler_factory(self)
if default_database is not None:
self.default_database = default_database
if name_map_collection is not None:
self.name_map_collection = name_map_collection
if conflict_error_factory is not None:
self.conflict_error_factory = conflict_error_factory
self.transaction_manager = transaction.manager
self.root = Root(self, root_database, root_collection)

Expand All @@ -211,33 +206,6 @@ def _get_collection_from_object(self, obj):
db_name, coll_name = self._writer.get_collection_name(obj)
return self._get_collection(db_name, coll_name)

def _check_conflict(self, obj, can_raise=True):
# This object is not even added to the database yet, so there
# cannot be a conflict.
if obj._p_oid is None:
return None if can_raise else False
coll = self._get_collection_from_object(obj)
new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
if new_doc is None:
return None if can_raise else False
if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
if can_raise:
orig_doc = self._original_states.get(obj._p_oid)
cur_doc = coll.find_one(obj._p_oid.id)
raise self.conflict_error_factory(
obj, orig_doc, cur_doc, new_doc)
else:
return True
return None if can_raise else False

def _check_conflicts(self):
if not self.detect_conflicts:
return
# Check each modified object to see whether Mongo has a new version of
# the object.
for obj in self._registered_objects:
self._check_conflict(obj)

def _flush_objects(self):
# Now write every registered object, but make sure we write each
# object just once.
Expand Down Expand Up @@ -275,7 +243,7 @@ def reset(self):

def flush(self):
# Check for conflicts.
self._check_conflicts()
self.conflict_handler.check_conflicts(self._registered_objects)
# Now write every registered object, but make sure we write each
# object just once.
self._flush_objects()
Expand Down Expand Up @@ -335,10 +303,12 @@ def register(self, obj):
self.transaction_manager.get().join(self)
self._needs_to_join = False

if obj is not None and obj not in self._registered_objects:
self._registered_objects.append(obj)
if obj is not None and obj not in self._modified_objects:
self._modified_objects.append(obj)
if obj is not None:
if obj not in self._registered_objects:
self._registered_objects.append(obj)
if obj not in self._modified_objects:
self._modified_objects.append(obj)
self.conflict_handler.on_modified(obj)

def abort(self, transaction):
# Aborting the transaction requires three steps:
Expand All @@ -364,8 +334,7 @@ def abort(self, transaction):
'Original state not found while aborting: %r (%s)',
obj, db_ref.id if db_ref else '')
continue
if (self.detect_conflicts and
self._check_conflict(obj, can_raise=False)):
if self.conflict_handler.has_conflicts([obj]):
# If we have a conflict, we are not going to reset to the
# original state. (This is a policy that should be made
# pluggable.)
Expand All @@ -378,7 +347,7 @@ def abort(self, transaction):
self.reset()

def commit(self, transaction):
self._check_conflicts()
self.conflict_handler.check_conflicts(self._registered_objects)

def tpc_begin(self, transaction):
pass
Expand Down
53 changes: 51 additions & 2 deletions src/mongopersist/interfaces.py
Expand Up @@ -63,6 +63,55 @@ def __repr__(self):
class CircularReferenceError(Exception):
pass

class IConflictHandler(zope.interface.Interface):

datamanager = zope.interface.Attribute(
"""The datamanager for which to conduct the conflict resolution.""")

def on_before_set_state(obj, state):
"""Method called just before the object's state is set."""

def on_before_store(obj, state):
"""Method called just before the object state is written to MongoDB."""

def on_after_store(obj, state):
"""Method called right after the object state was written to MongoDB."""

def on_modified(obj):
"""Method called when an object is registered as modified."""

def has_conflicts(objs):
"""Checks whether any of the passed in objects have conflicts.
Returns False if conflicts were found, otherwise True is returned.
While calling this method, the conflict handler may try to resolve
conflicts.
"""

def check_conflicts(self, objs):
"""Checks whether any of the passed in objects have conflicts.
Raises a ``ConflictError`` for the first object with a conflict.
While calling this method, the conflict handler may try to resolve
conflicts.
"""

class IResolvingConflictHandler(IConflictHandler):
"""A conflict handler that is able to resolve conflicts."""

def resolve(obj, orig_doc, cur_doc, new_doc):
"""Tries to resolve a conflict.
This is usually done through some comparison of the states. The method
returns ``True`` if the conflict was resolved and ``False`` otherwise.
It is the responsibility of this method to modify the object and data
manager models, so that the resolution is valid in the next step.
"""


class IObjectSerializer(zope.interface.Interface):
"""An object serializer allows for custom serialization output for
objects."""
Expand Down Expand Up @@ -134,8 +183,8 @@ class IMongoDataManager(persistent.interfaces.IPersistentDataManager):
root = zope.interface.Attribute(
"""Get the root object, which is a mapping.""")

detect_conflicts = zope.interface.Attribute(
"""A flag, when set it enables write conflict detection.""")
conflict_handler = zope.interface.Attribute(
"""An ``IConflictHandler`` instance that handles all conflicts.""")

def get_collection(db_name, coll_name):
"""Return the collection for the given DB and collection names."""
Expand Down

0 comments on commit 62100a1

Please sign in to comment.