Skip to content

Commit

Permalink
Merge branch 'mapreduce' of git://github.com/blackbrrr/mongoengine
Browse files Browse the repository at this point in the history
Conflicts:
	mongoengine/queryset.py
  • Loading branch information
hmarr committed Mar 17, 2010
2 parents 39fc862 + f47d926 commit 047cc21
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 78 deletions.
3 changes: 3 additions & 0 deletions docs/apireference.rst
Expand Up @@ -20,6 +20,9 @@ Documents

.. autoclass:: mongoengine.EmbeddedDocument
:members:

.. autoclass:: mongoengine.MapReduceDocument
:members:

Querying
========
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Expand Up @@ -22,7 +22,7 @@

# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.todo']

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down
36 changes: 36 additions & 0 deletions mongoengine/document.py
Expand Up @@ -115,3 +115,39 @@ def drop_collection(cls):
"""
db = _get_db()
db.drop_collection(cls._meta['collection'])


class MapReduceDocument(object):
"""A document returned from a map/reduce query.
:param collection: An instance of :class:`~pymongo.Collection`
:param key: Document/result key, often an instance of
:class:`~pymongo.objectid.ObjectId`. If supplied as
an ``ObjectId`` found in the given ``collection``,
the object can be accessed via the ``object`` property.
:param value: The result(s) for this key.
.. versionadded:: 0.3
"""

def __init__(self, document, collection, key, value):
self._document = document
self._collection = collection
self.key = key
self.value = value

@property
def object(self):
"""Lazy-load the object referenced by ``self.key``. If ``self.key``
is not an ``ObjectId``, simply return ``self.key``.
"""
if not isinstance(self.key, (pymongo.objectid.ObjectId)):
try:
self.key = pymongo.objectid.ObjectId(self.key)
except:
return self.key
if not hasattr(self, "_key_object"):
self._key_object = self._document.objects.with_id(self.key)
return self._key_object
return self._key_object
131 changes: 100 additions & 31 deletions mongoengine/queryset.py
Expand Up @@ -5,15 +5,15 @@
import copy


__all__ = ['queryset_manager', 'Q', 'InvalidQueryError',
__all__ = ['queryset_manager', 'Q', 'InvalidQueryError',
'InvalidCollectionError']

# The maximum number of items to display in a QuerySet.__repr__
REPR_OUTPUT_SIZE = 20


class DoesNotExist(Exception):
pass
pass


class MultipleObjectsReturned(Exception):
Expand All @@ -30,8 +30,9 @@ class OperationError(Exception):

RE_TYPE = type(re.compile(''))


class Q(object):

OR = '||'
AND = '&&'
OPERATORS = {
Expand All @@ -52,7 +53,7 @@ class Q(object):
'regex_eq': '%(value)s.test(this.%(field)s)',
'regex_ne': '!%(value)s.test(this.%(field)s)',
}

def __init__(self, **query):
self.query = [query]

Expand Down Expand Up @@ -132,26 +133,27 @@ def _build_op_js(self, op, key, value, value_name):
return value, operation_js

class QuerySet(object):
"""A set of results returned from a query. Wraps a MongoDB cursor,
"""A set of results returned from a query. Wraps a MongoDB cursor,
providing :class:`~mongoengine.Document` objects as the results.
"""

def __init__(self, document, collection):
self._document = document
self._collection_obj = collection
self._accessed_collection = False
self._query = {}
self._where_clause = None
self._loaded_fields = []

self._ordering = []

# If inheritance is allowed, only return instances and instances of
# subclasses of the class being used
if document._meta.get('allow_inheritance'):
self._query = {'_types': self._document._class_name}
self._cursor_obj = None
self._limit = None
self._skip = None

def ensure_index(self, key_or_list):
"""Ensure that the given indexes are in place.
Expand Down Expand Up @@ -199,7 +201,7 @@ def _build_index_spec(cls, doc_cls, key_or_list):
return index_list

def __call__(self, q_obj=None, **query):
"""Filter the selected documents by calling the
"""Filter the selected documents by calling the
:class:`~mongoengine.queryset.QuerySet` with a query.
:param q_obj: a :class:`~mongoengine.queryset.Q` object to be used in
Expand All @@ -213,7 +215,7 @@ def __call__(self, q_obj=None, **query):
query = QuerySet._transform_query(_doc_cls=self._document, **query)
self._query.update(query)
return self

def filter(self, *q_objs, **query):
"""An alias of :meth:`~mongoengine.queryset.QuerySet.__call__`
"""
Expand Down Expand Up @@ -253,11 +255,11 @@ def _cursor(self):
# Apply where clauses to cursor
if self._where_clause:
self._cursor_obj.where(self._where_clause)

# apply default ordering
if self._document._meta['ordering']:
self.order_by(*self._document._meta['ordering'])

return self._cursor_obj

@classmethod
Expand Down Expand Up @@ -337,8 +339,8 @@ def _transform_query(cls, _doc_cls=None, **query):
return mongo_query

def get(self, *q_objs, **query):
"""Retrieve the the matching object raising
:class:`~mongoengine.queryset.MultipleObjectsReturned` or
"""Retrieve the the matching object raising
:class:`~mongoengine.queryset.MultipleObjectsReturned` or
:class:`~mongoengine.queryset.DoesNotExist` exceptions if multiple or
no results are found.
"""
Expand All @@ -354,15 +356,15 @@ def get(self, *q_objs, **query):

def get_or_create(self, *q_objs, **query):
"""Retreive unique object or create, if it doesn't exist. Raises
:class:`~mongoengine.queryset.MultipleObjectsReturned` if multiple
results are found. A new document will be created if the document
:class:`~mongoengine.queryset.MultipleObjectsReturned` if multiple
results are found. A new document will be created if the document
doesn't exists; a dictionary of default values for the new document
may be provided as a keyword argument called :attr:`defaults`.
"""
defaults = query.get('defaults', {})
if query.has_key('defaults'):
if 'defaults' in query:
del query['defaults']

self.__call__(*q_objs, **query)
count = self.count()
if count == 0:
Expand Down Expand Up @@ -439,6 +441,70 @@ def count(self):
def __len__(self):
return self.count()

def map_reduce(self, map_f, reduce_f, finalize_f=None, limit=None,
scope=None, keep_temp=False):
"""Perform a map/reduce query using the current query spec
and ordering. While ``map_reduce`` respects ``QuerySet`` chaining,
it must be the last call made, as it does not return a maleable
``QuerySet``.
See the :meth:`~mongoengine.tests.QuerySetTest.test_map_reduce`
and :meth:`~mongoengine.tests.QuerySetTest.test_map_advanced`
tests in ``tests.queryset.QuerySetTest`` for usage examples.
:param map_f: map function, as :class:`~pymongo.code.Code` or string
:param reduce_f: reduce function, as
:class:`~pymongo.code.Code` or string
:param finalize_f: finalize function, an optional function that
performs any post-reduction processing.
:param scope: values to insert into map/reduce global scope. Optional.
:param limit: number of objects from current query to provide
to map/reduce method
:param keep_temp: keep temporary table (boolean, default ``True``)
Returns an iterator yielding
:class:`~mongoengine.document.MapReduceDocument`.
.. note:: Map/Reduce requires server version **>= 1.1.1**. The PyMongo
:meth:`~pymongo.collection.Collection.map_reduce` helper requires
PyMongo version **>= 1.2**.
.. versionadded:: 0.3
"""
from document import MapReduceDocument

if not hasattr(self._collection, "map_reduce"):
raise NotImplementedError("Requires MongoDB >= 1.1.1")

if not isinstance(map_f, pymongo.code.Code):
map_f = pymongo.code.Code(map_f)
if not isinstance(reduce_f, pymongo.code.Code):
reduce_f = pymongo.code.Code(reduce_f)

mr_args = {'query': self._query, 'keeptemp': keep_temp}

if finalize_f:
if not isinstance(finalize_f, pymongo.code.Code):
finalize_f = pymongo.code.Code(finalize_f)
mr_args['finalize'] = finalize_f

if scope:
mr_args['scope'] = scope

if limit:
mr_args['limit'] = limit

results = self._collection.map_reduce(map_f, reduce_f, **mr_args)
results = results.find()

if self._ordering:
results = results.sort(self._ordering)

for doc in results:
yield MapReduceDocument(self._document, self._collection,
doc['_id'], doc['value'])

def limit(self, n):
"""Limit the number of returned documents to `n`. This may also be
achieved using array-slicing syntax (e.g. ``User.objects[:5]``).
Expand All @@ -450,6 +516,7 @@ def limit(self, n):
else:
self._cursor.limit(n)
self._limit = n

# Return self to allow chaining
return self

Expand Down Expand Up @@ -523,13 +590,14 @@ def order_by(self, *keys):
direction = pymongo.DESCENDING
if key[0] in ('-', '+'):
key = key[1:]
key_list.append((key, direction))
key_list.append((key, direction))

self._ordering = key_list
self._cursor.sort(key_list)
return self

def explain(self, format=False):
"""Return an explain plan record for the
"""Return an explain plan record for the
:class:`~mongoengine.queryset.QuerySet`\ 's cursor.
:param format: format the plan before returning it
Expand All @@ -540,7 +608,7 @@ def explain(self, format=False):
import pprint
plan = pprint.pformat(plan)
return plan

def delete(self, safe=False):
"""Delete the documents matched by the query.
Expand All @@ -552,7 +620,7 @@ def delete(self, safe=False):
def _transform_update(cls, _doc_cls=None, **update):
"""Transform an update spec from Django-style format to Mongo format.
"""
operators = ['set', 'unset', 'inc', 'dec', 'push', 'push_all', 'pull',
operators = ['set', 'unset', 'inc', 'dec', 'push', 'push_all', 'pull',
'pull_all']

mongo_update = {}
Expand Down Expand Up @@ -661,8 +729,8 @@ def exec_js(self, code, *fields, **options):
"""Execute a Javascript function on the server. A list of fields may be
provided, which will be translated to their correct names and supplied
as the arguments to the function. A few extra variables are added to
the function's scope: ``collection``, which is the name of the
collection in use; ``query``, which is an object representing the
the function's scope: ``collection``, which is the name of the
collection in use; ``query``, which is an object representing the
current query; and ``options``, which is an object containing any
options specified as keyword arguments.
Expand All @@ -676,7 +744,7 @@ def exec_js(self, code, *fields, **options):
:param code: a string of Javascript code to execute
:param fields: fields that you will be using in your function, which
will be passed in to your function as arguments
:param options: options that you want available to the function
:param options: options that you want available to the function
(accessed in Javascript through the ``options`` object)
"""
code = self._sub_js_fields(code)
Expand All @@ -693,7 +761,7 @@ def exec_js(self, code, *fields, **options):
query = self._query
if self._where_clause:
query['$where'] = self._where_clause

scope['query'] = query
code = pymongo.code.Code(code, scope=scope)

Expand Down Expand Up @@ -741,7 +809,7 @@ def average(self, field):
def item_frequencies(self, list_field, normalize=False):
"""Returns a dictionary of all items present in a list field across
the whole queried set of documents, and their corresponding frequency.
This is useful for generating tag clouds, or searching documents.
This is useful for generating tag clouds, or searching documents.
:param list_field: the list field to use
:param normalize: normalize the results so they add to 1.0
Expand Down Expand Up @@ -791,7 +859,7 @@ def __init__(self, manager_func=None):
self._collection = None

def __get__(self, instance, owner):
"""Descriptor for instantiating a new QuerySet object when
"""Descriptor for instantiating a new QuerySet object when
Document.objects is accessed.
"""
if instance is not None:
Expand All @@ -810,7 +878,7 @@ def __get__(self, instance, owner):

if collection in db.collection_names():
self._collection = db[collection]
# The collection already exists, check if its capped
# The collection already exists, check if its capped
# options match the specified capped options
options = self._collection.options()
if options.get('max') != max_documents or \
Expand All @@ -826,7 +894,7 @@ def __get__(self, instance, owner):
self._collection = db.create_collection(collection, opts)
else:
self._collection = db[collection]

# owner is the document that contains the QuerySetManager
queryset = QuerySet(owner, self._collection)
if self._manager_func:
Expand All @@ -836,6 +904,7 @@ def __get__(self, instance, owner):
queryset = self._manager_func(owner, queryset)
return queryset


def queryset_manager(func):
"""Decorator that allows you to define custom QuerySet managers on
:class:`~mongoengine.Document` classes. The manager must be a function that
Expand Down

0 comments on commit 047cc21

Please sign in to comment.