Permalink
Browse files

Move more write operations into ops.py for transactional purposes

  • Loading branch information...
1 parent 595bfad commit b7978b532ff36dc99c5e70f91c003324758ef74f @jeffjenkins committed Jul 29, 2012
Showing with 123 additions and 59 deletions.
  1. +3 −0 mongoalchemy/exceptions.py
  2. +70 −10 mongoalchemy/ops.py
  3. +44 −49 mongoalchemy/session.py
  4. +6 −0 test/test_session.py
@@ -39,3 +39,6 @@ class BadFieldSpecification(Exception):
field'''
pass
+
+class TransactionException(Exception):
+ pass
View
@@ -1,3 +1,4 @@
+from itertools import chain
from bson.objectid import ObjectId
from abc import ABCMeta, abstractmethod
@@ -8,13 +9,66 @@ class Operation(object):
def update_cache(self): pass
+ @property
+ def collection(self):
+ return self.session.db[self.type.get_collection_name()]
+
def ensure_indexes(self):
c = self.collection
for index in self.type.get_indexes():
index.ensure(c)
+class ClearCollectionOp(Operation):
+ def __init__(self, session, kind):
+ self.session = session
+ self.type = kind
+ def execute(self):
+ self.collection.remove()
+
+class UpdateDocumentOp(Operation):
+ def __init__(self, session, document, safe, id_expression=None, upsert=False, update_ops={}, **kwargs):
+ from mongoalchemy.query import Query
+ self.session = session
+ self.type = type(document)
+ self.safe = safe
+ self.upsert = upsert
+
+ if id_expression:
+ self.db_key = Query(self.type, session).filter(id_expression).query
+ else:
+ self.db_key = {'_id' : document.mongo_id}
+
+ self.dirty_ops = document.get_dirty_ops(with_required=upsert)
+ for key, op in chain(update_ops.items(), kwargs.items()):
+ key = str(key)
+ for current_op, keys in self.dirty_ops.items():
+ if key not in keys:
+ continue
+ self.dirty_ops.setdefault(op,{})[key] = keys[key]
+ del self.dirty_ops[current_op][key]
+ if len(self.dirty_ops[current_op]) == 0:
+ del self.dirty_ops[current_op]
+
+ def execute(self):
+ self.ensure_indexes()
+ return self.collection.update(self.db_key, self.dirty_ops, upsert=self.upsert, safe=self.safe)
+
+class UpdateOp(Operation):
+ def __init__(self, session, kind, safe, update_obj):
+ self.session = session
+ self.type = kind
+ self.safe = safe
+ self.query = update_obj.query.query
+ self.update_data = update_obj.update_data
+ self.upsert = update_obj.get_upsert()
+ self.multi = update_obj.get_multi()
+
+ def execute(self):
+ return self.collection.update(self.query, self.update_data, multi=self.multi,
+ upsert=self.upsert, safe=self.safe)
+
-class Save(Operation):
+class SaveOp(Operation):
def __init__(self, session, document, safe):
self.session = session
self.data = document.wrap()
@@ -24,25 +78,31 @@ def __init__(self, session, document, safe):
if '_id' not in self.data:
self.data['_id'] = ObjectId()
document.mongo_id = self.data['_id']
- @property
- def collection(self):
- return self.session.db[self.type.get_collection_name()]
def execute(self):
self.ensure_indexes()
- self.collection.save(self.data, safe=self.safe)
+ return self.collection.save(self.data, safe=self.safe)
+
+class RemoveOp(Operation):
+ def __init__(self, session, kind, safe, query):
+ self.session = session
+ self.query = query.query
+ self.safe = safe
+ self.type = kind
+
+ def execute(self):
+ self.ensure_indexes()
+ return self.collection.remove(self.query, safe=self.safe)
+
-class RemoveObject(Operation):
+class RemoveDocumentOp(Operation):
def __init__(self, session, obj, safe):
self.session = session
self.type = type(obj)
self.safe = safe
self.id = None
if obj.has_id():
self.id = obj.mongo_id
- @property
- def collection(self):
- return self.session.db[self.type.get_collection_name()]
def execute(self):
if self.id is None:
@@ -51,5 +111,5 @@ def execute(self):
self.ensure_indexes()
collection = db[self.type.get_collection_name()]
- collection.remove(self.id, safe=self.safe)
+ return collection.remove(self.id, safe=self.safe)
View
@@ -47,8 +47,8 @@
from mongoalchemy.query import Query, QueryResult, RemoveQuery
from mongoalchemy.document import FieldNotRetrieved, Document
from mongoalchemy.query_expression import FreeFormDoc
+from mongoalchemy.exceptions import TransactionException
from mongoalchemy.ops import *
-from itertools import chain
class Session(object):
@@ -72,9 +72,14 @@ def __init__(self, database, timezone=None, safe=False, cache_size=0):
self.timezone = timezone
self.cache_size = cache_size
self.cache = {}
- self.autoflush = True
- self.in_transation = True
-
+
+ self.transactions = []
+ @property
+ def autoflush(self):
+ return not self.in_transaction
+ @property
+ def in_transaction(self):
+ return len(self.transactions) > 0
@property
def tz_aware(self):
return self.timezone is not None
@@ -138,12 +143,12 @@ def add(self, item, safe=None):
item._set_session(self)
if safe is None:
safe = self.safe
- self.queue.append(Save(self, item, safe))
+ self.queue.append(SaveOp(self, item, safe))
# after the save op is recorded, the document has an _id and can be
# cached
self.cache_write(item)
if self.autoflush:
- self.flush()
+ return self.flush()
def update(self, item, id_expression=None, upsert=False, update_ops={}, safe=None, **kwargs):
''' Update an item in the database. Uses the on_update keyword to each
@@ -171,28 +176,13 @@ def update(self, item, id_expression=None, upsert=False, update_ops={}, safe=Non
This operation is **experimental** and **not fully tested**,
although it does have code coverage.
'''
- item._set_session(self)
- if id_expression:
- db_key = Query(type(item), self).filter(id_expression).query
- else:
- db_key = {'_id' : item.mongo_id}
-
- dirty_ops = item.get_dirty_ops(with_required=upsert)
- for key, op in chain(update_ops.items(), kwargs.items()):
- key = str(key)
- for current_op, keys in dirty_ops.items():
- if key not in keys:
- continue
- dirty_ops.setdefault(op,{})[key] = keys[key]
- del dirty_ops[current_op][key]
- if len(dirty_ops[current_op]) == 0:
- del dirty_ops[current_op]
if safe is None:
safe = self.safe
- self.flush(safe=safe)
- self.db[item.get_collection_name()].update(db_key, dirty_ops, upsert=upsert, safe=safe)
+ self.queue.append(UpdateDocumentOp(self, item, safe, id_expression=id_expression,
+ upsert=upsert, update_ops=update_ops, **kwargs))
+ if self.autoflush:
+ return self.flush()
-
def query(self, type):
''' Begin a query on the database's collection for `type`. If `type`
is an instance of basesting, the query will be in raw query mode
@@ -211,7 +201,10 @@ def add_to_session(self, obj):
obj._set_session(self)
def execute_query(self, query, session):
- ''' Get the results of ``query``. This method will flush the queue '''
+ ''' Get the results of ``query``. This method does flush in a
+ transaction, so any objects retrieved which are not in the cache
+ which would be updated when the transaction finishes will be
+ stale '''
collection = self.db[query.type.get_collection_name()]
for index in query.type.get_indexes():
index.ensure(collection)
@@ -250,42 +243,40 @@ def remove(self, obj, safe=None):
'''
if safe is None:
safe = self.safe
- remove = RemoveObject(self, obj, safe)
+ remove = RemoveDocumentOp(self, obj, safe)
self.queue.append(remove)
if self.autoflush:
- self.flush()
+ return self.flush()
def execute_remove(self, remove):
''' Execute a remove expression. Should generally only be called implicitly.
'''
- self.flush()
+
safe = self.safe
- if remove.safe != None:
+ if remove.safe is not None:
safe = remove.safe
- collection = self.db[remove.type.get_collection_name()]
- for index in remove.type.get_indexes():
- index.ensure(collection)
-
- return self.db[remove.type.get_collection_name()].remove(remove.query, safe=safe)
+ self.queue.append(RemoveOp(self, remove.type, safe, remove))
+ if self.autoflush:
+ return self.flush()
def execute_update(self, update, safe=False):
''' Execute an update expression. Should generally only be called implicitly.
'''
- self.flush()
+ # safe = self.safe
+ # if update.safe is not None:
+ # safe = remove.safe
+
assert len(update.update_data) > 0
- collection = self.db[update.query.type.get_collection_name()]
- for index in update.query.type.get_indexes():
- index.ensure(collection)
- kwargs = dict(
- upsert=update.get_upsert(),
- multi=update.get_multi(),
- safe=safe,
- )
- collection.update(update.query.query, update.update_data, **kwargs)
+ self.queue.append(UpdateOp(self, update.query.type, safe, update))
+ if self.autoflush:
+ return self.flush()
+
def execute_find_and_modify(self, fm_exp):
+ if self.in_transaction:
+ raise TransactionException('Cannot find and modify in a transaction.')
self.flush()
# assert len(fm_exp.update_data) > 0
collection = self.db[fm_exp.query.type.get_collection_name()]
@@ -345,18 +336,22 @@ def clear_collection(self, *classes):
''' Clear all objects from the collections associated with the
objects in `*cls`. **use with caution!**'''
for c in classes:
- self.db[c.get_collection_name()].remove()
+ self.queue.append(ClearCollectionOp(self, c))
+ if self.autoflush:
+ self.flush()
def flush(self, safe=None):
''' Perform all database operations currently in the queue'''
+ result = None
for index, op in enumerate(self.queue):
try:
- op.execute()
+ result = op.execute()
except:
self.cache = {}
self.clear()
raise
self.clear()
+ return result
def dereference(self, ref):
if isinstance(ref, Document):
@@ -393,11 +388,11 @@ def clone(self, document):
return type(document).unwrap(wrapped, session=self)
def __enter__(self):
- self.in_transation = True
+ self.transactions.append(None)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- self.in_transation = False
+ self.transactions.pop()
self.flush()
self.end()
return False
View
@@ -2,6 +2,7 @@
from mongoalchemy.session import Session
from mongoalchemy.document import Document, Index, DocumentField
from mongoalchemy.fields import *
+from mongoalchemy.exceptions import *
from test.util import known_failure
from pymongo.errors import DuplicateKeyError
@@ -49,6 +50,11 @@ class DT(Document):
assert x.created.tzinfo is not None
assert x.modified.tzinfo is not None
+@raises(TransactionException)
+def test_find_and_modify_in_session():
+ s = Session.connect('unit-testing')
+ with s:
+ s.execute_find_and_modify({})
def test_session():
s = Session.connect('unit-testing')

0 comments on commit b7978b5

Please sign in to comment.