Permalink
Browse files

Allow queries to use bonus filters ('in', 'not null', etc)

  • Loading branch information...
stevearc committed May 12, 2015
1 parent e559a04 commit 8b8854d7f50c7ae716258be13e31517ca8cde73a
Showing with 34 additions and 31 deletions.
  1. +6 −6 flywheel/fields/conditions.py
  2. +4 −1 flywheel/model_meta.py
  3. +24 −24 tests/test_queries.py
@@ -1,6 +1,8 @@
""" Query constraints """
import six
FILTER_ONLY = set(['contains', 'ncontains', 'null', 'in', 'ne'])
class Condition(object):
@@ -39,17 +41,15 @@ def scan_kwargs(self):
def query_kwargs(self, model):
""" Get the kwargs for doing a table query """
scan_only = set(['contains', 'ncontains', 'null', 'in', 'ne'])
for op, _ in six.itervalues(self.fields):
if op in scan_only:
raise ValueError("Operation '%s' cannot be used in a query!" %
op)
if self.index_name is not None:
ordering = model.meta_.get_ordering_from_index(self.index_name)
else:
queryable_keys = [k for k, (op, _) in six.iteritems(self.fields)
if op not in FILTER_ONLY]
ordering = model.meta_.get_ordering_from_fields(
self.eq_fields.keys(),
self.fields.keys())
queryable_keys,
)
if ordering is None:
raise ValueError("Bad query arguments. You must provide a hash "
@@ -3,10 +3,11 @@
import time
import inspect
from dynamo3 import DynamoKey, LocalIndex, GlobalIndex, Throughput
from dynamo3 import DynamoKey, Throughput
from collections import defaultdict
from .fields import Field
from .fields.conditions import FILTER_ONLY
class ValidationError(Exception):
@@ -50,6 +51,8 @@ def query_kwargs(self, eq_fields, fields):
else:
for field in self.range_key.can_resolve(fields):
(op, val) = fields[field]
if op in FILTER_ONLY:
continue
kwargs['%s__%s' % (field, op)] = val
remaining.remove(field)
@@ -235,54 +235,54 @@ def test_filter_beginswith(self):
self.assertEquals(results, [u])
def test_filter_ne(self):
""" Queries cannot filter ne """
""" Queries can filter ne """
u = User(id='a', name='Adam')
u2 = User(id='a', name='Aaron')
self.engine.save([u, u2])
with self.assertRaises(ValueError):
self.engine.query(User).filter(User.id == 'a')\
.filter(User.name != 'Adam').all()
ret = self.engine.query(User).filter(User.id == 'a')\
.filter(User.name != 'Adam').one()
self.assertEqual(ret, u2)
def test_filter_in(self):
""" Queries cannot filter in """
""" Queries can filter in """
u = User(id='a', name='Adam')
u2 = User(id='a', name='Aaron')
self.engine.save([u, u2])
with self.assertRaises(ValueError):
self.engine.query(User).filter(User.id == 'a')\
.filter(User.name.in_(set())).all()
ret = self.engine.query(User).filter(User.id == 'a')\
.filter(User.name.in_(set(['Adam']))).one()
self.assertEqual(ret, u)
def test_filter_contains(self):
""" Queries cannot filter contains """
u = User(id='a', name='Adam')
u2 = User(id='a', name='Aaron')
""" Queries can filter contains """
u = User(id='a', name='Adam', str_set=set(['foo', 'bar']))
u2 = User(id='a', name='Aaron', str_set=set(['bar']))
self.engine.save([u, u2])
with self.assertRaises(ValueError):
self.engine.query(User).filter(User.id == 'a')\
.filter(User.str_set.contains_('hi')).all()
ret = self.engine.query(User).filter(User.id == 'a')\
.filter(User.str_set.contains_('foo')).one()
self.assertEqual(ret, u)
def test_filter_null(self):
""" Queries cannot filter null """
u = User(id='a', name='Adam')
""" Queries can filter null """
u = User(id='a', name='Adam', str_set=set(['foo']))
u2 = User(id='a', name='Aaron')
self.engine.save([u, u2])
with self.assertRaises(ValueError):
self.engine.query(User).filter(User.id == 'a')\
.filter(User.name == None).all() # noqa
ret = self.engine.query(User).filter(User.id == 'a')\
.filter(User.str_set == None).one() # noqa
self.assertEqual(ret, u2)
def test_filter_not_null(self):
""" Queries cannot filter not null """
u = User(id='a', name='Adam')
""" Queries can filter not null """
u = User(id='a', name='Adam', str_set=set(['foo']))
u2 = User(id='a', name='Aaron')
self.engine.save([u, u2])
with self.assertRaises(ValueError):
self.engine.query(User).filter(User.id == 'a')\
.filter(User.name != None).all() # noqa
ret = self.engine.query(User).filter(User.id == 'a')\
.filter(User.str_set != None).one() # noqa
self.assertEqual(ret, u)
def test_smart_local_index(self):
""" Queries auto-select local secondary index """

0 comments on commit 8b8854d

Please sign in to comment.