Permalink
Browse files

Initial version. Incorporated tests from zzzeek's version

  • Loading branch information...
0 parents commit 15c5157cb3448e93ac64f01a256ba49647dc21ef @mitsuhiko committed Jul 19, 2011
Showing with 227 additions and 0 deletions.
  1. +31 −0 LICENSE
  2. +6 −0 README
  3. +109 −0 sqlalchemy_django_query.py
  4. +81 −0 tests.py
31 LICENSE
@@ -0,0 +1,31 @@
+Copyright (c) 2011 by Armin Ronacher and Mike Bayer.
+
+Some rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * The names of the contributors may not be used to endorse or
+ promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
6 README
@@ -0,0 +1,6 @@
+django_sqlalchemy_query
+```````````````````````
+
+A module that implements Django like query objects for SQLAlchemy.
+This repository is waiting for a pull request that adds a setup.py
+and documentation. Look at the tests for some examples.
109 sqlalchemy_django_query.py
@@ -0,0 +1,109 @@
+# -*- coding: utf-8 -*-
+"""
+ sqlalchemy_django_query
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ A module that implements a more Django like interface for SQLAlchemy
+ query objects. It's still API compatible with the regular one but
+ extends it with Djangoisms.
+
+ :copyright: 2011 by Armin Ronacher, Mike Bayer.
+ license: BSD, see LICENSE for more details.
+"""
+from sqlalchemy.orm.query import Query
+from sqlalchemy.orm.util import _entity_descriptor
+from sqlalchemy.util import to_list
+from sqlalchemy.sql import operators, extract
+
+
+class DjangoQuery(Query):
+ """A subclass of a regular SQLAlchemy query object that implements
+ more Django like behavior:
+
+ - `filter_by` supports implicit joining and subitem accessing with
+ double underscores.
+ - `exclude_by` works like `filter_by` just that every expression is
+ automatically negated.
+ - `order_by` supports ordering by field name with an optional `-`
+ in front.
+ """
+ _underscore_operators = {
+ 'gt': operators.gt,
+ 'lte': operators.lt,
+ 'gte': operators.ge,
+ 'le': operators.le,
+ 'contains': operators.contains_op,
+ 'in': operators.in_op,
+ 'exact': operators.eq,
+ 'iexact': operators.ilike_op,
+ 'startswith': operators.startswith_op,
+ 'istartswith': lambda c, x: c.ilike(x.replace('%', '%%') + '%'),
+ 'iendswith': lambda c, x: c.ilike('%' + x.replace('%', '%%')),
+ 'endswith': operators.endswith_op,
+ 'isnull': lambda c, x: x and c != None or c == None,
+ 'range': operators.between_op,
+ 'year': lambda c, x: extract('year', c) == x,
+ 'month': lambda c, x: extract('month', c) == x,
+ 'day': lambda c, x: extract('day', c) == x
+ }
+
+ def filter_by(self, **kwargs):
+ return self._filter_or_exclude(False, kwargs)
+
+ def exclude_by(self, **kwargs):
+ return self._filter_or_exclude(True, kwargs)
+
+ def order_by(self, *args):
+ args = list(args)
+ joins_needed = []
+ for idx, arg in enumerate(args):
+ q = self
+ if not isinstance(arg, basestring):
+ continue
+ if arg[0] in '+-':
+ desc = arg[0] == '-'
+ arg = arg[1:]
+ else:
+ desc = False
+ q = self
+ column = None
+ for token in arg.split('__'):
+ column = _entity_descriptor(q._joinpoint_zero(), token)
+ if column.impl.uses_objects:
+ q = q.join(column)
+ joins_needed.append(column)
+ column = None
+ if column is None:
+ raise ValueError('Tried to order by table, column expected')
+ if desc:
+ column = column.desc()
+ args[idx] = column
+
+ q = Query.order_by(self, *args)
+ for join in joins_needed:
+ q = q.join(join)
+ return q
+
+ def _filter_or_exclude(self, negate, kwargs):
+ q = self
+ negate_if = lambda expr: expr if not negate else ~expr
+ column = None
+
+ for arg, value in kwargs.iteritems():
+ for token in arg.split('__'):
+ if column is None:
+ column = _entity_descriptor(q._joinpoint_zero(), token)
+ if column.impl.uses_objects:
+ q = q.join(column)
+ column = None
+ elif token in self._underscore_operators:
+ op = self._underscore_operators[token]
+ q = q.filter(negate_if(op(column, *to_list(value))))
+ column = None
+ else:
+ raise ValueError('No idea what to do with %r' % token)
+ if column is not None:
+ q = q.filter(negate_if(column == value))
+ column = None
+ q = q.reset_joinpoint()
+ return q
81 tests.py
@@ -0,0 +1,81 @@
+import unittest
+from sqlalchemy import Column, Integer, String, ForeignKey, Date, create_engine
+from sqlalchemy.orm import Session, relationship
+from sqlalchemy.ext.declarative import declarative_base, declared_attr
+import datetime
+
+from sqlalchemy_django_query import DjangoQuery
+
+
+class BasicTestCase(unittest.TestCase):
+
+ def setUp(self):
+ class Base(object):
+ @declared_attr
+ def __tablename__(cls):
+ return cls.__name__.lower()
+ id = Column(Integer, primary_key=True)
+ Base = declarative_base(cls=Base)
+
+ class Blog(Base):
+ name = Column(String)
+ entries = relationship('Entry', backref='blog')
+
+ class Entry(Base):
+ blog_id = Column(Integer, ForeignKey('blog.id'))
+ pub_date = Column(Date)
+ headline = Column(String)
+ body = Column(String)
+
+ engine = create_engine('sqlite://')
+ Base.metadata.create_all(engine)
+ self.session = Session(engine, query_cls=DjangoQuery)
+ self.Base = Base
+ self.Blog = Blog
+ self.Entry = Entry
+ self.engine = engine
+
+ self.b1 = Blog(name='blog1', entries=[
+ Entry(headline='b1 headline 1', body='body 1',
+ pub_date=datetime.date(2010, 2, 5)),
+ Entry(headline='b1 headline 2', body='body 2',
+ pub_date=datetime.date(2010, 4, 8)),
+ Entry(headline='b1 headline 3', body='body 3',
+ pub_date=datetime.date(2010, 9, 14))
+ ])
+ self.b2 = Blog(name='blog2', entries=[
+ Entry(headline='b2 headline 1', body='body 1',
+ pub_date=datetime.date(2010, 5, 12)),
+ Entry(headline='b2 headline 2', body='body 2',
+ pub_date=datetime.date(2010, 7, 18)),
+ Entry(headline='b2 headline 3', body='body 3',
+ pub_date=datetime.date(2011, 8, 27))
+ ])
+
+ self.session.add_all([self.b1, self.b2])
+ self.session.commit()
+
+ def test_basic_filtering(self):
+ bq = self.session.query(self.Blog)
+ eq = self.session.query(self.Entry)
+ assert bq.filter_by(name__exact='blog1').one() is self.b1
+ assert bq.filter_by(name__contains='blog').all() == [self.b1, self.b2]
+ assert bq.filter_by(entries__headline__exact='b2 headline 2').one() is self.b2
+ assert bq.filter_by(entries__pub_date__range=(datetime.date(2010, 1, 1),
+ datetime.date(2010, 3, 1))).one() is self.b1
+ assert eq.filter_by(pub_date__year=2011).one() is self.b2.entries[2]
+ assert eq.filter_by(pub_date__year=2011, id=self.b2.entries[2].id
+ ).one() is self.b2.entries[2]
+
+ def test_basic_excluding(self):
+ eq = self.session.query(self.Entry)
+ assert eq.exclude_by(pub_date__year=2010).one() is self.b2.entries[2]
+
+ def test_basic_ordering(self):
+ eq = self.session.query(self.Entry)
+ assert eq.order_by('-blog__name', 'id').all() == \
+ self.b2.entries + self.b1.entries
+
+
+if __name__ == '__main__':
+ unittest.main()

0 comments on commit 15c5157

Please sign in to comment.