Skip to content

Commit

Permalink
Merge branch 'feature/issue-857-non-mapreduce-riak-search' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Oct 23, 2014
2 parents cd01344 + 0ee88bb commit 30d2d5d
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 0 deletions.
15 changes: 15 additions & 0 deletions vumi/persist/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,15 @@ def raw_search(cls, manager, query):
"""
return manager.mr_from_search(cls, query)

@classmethod
def real_search(cls, manager, query, rows=None):
"""
Performs a real riak search, does no inspection on the given query.
:returns: list of keys.
"""
return manager.real_search(cls, query, rows=rows)

@classmethod
def enable_search(cls, manager):
"""Enable solr indexing over for this model and manager."""
Expand Down Expand Up @@ -807,6 +816,9 @@ def mr_from_field_match(self, model, query, field_name, start_value,
def mr_from_keys(self, model, keys):
return VumiMapReduce.from_keys(self, model, keys)

def real_search(self, model, query, rows=None):
raise NotImplementedError()

def riak_enable_search(self, model):
"""Enable search indexing for the model's bucket."""
raise NotImplementedError("Sub-classes of Manager should implement"
Expand Down Expand Up @@ -866,5 +878,8 @@ def search(self, **kw):
def raw_search(self, query):
return self._modelcls.raw_search(self._manager, query)

def real_search(self, query, rows=None):
return self._modelcls.real_search(self._manager, query, rows=rows)

def enable_search(self):
return self._modelcls.enable_search(self._manager)
15 changes: 15 additions & 0 deletions vumi/persist/riak_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,21 @@ def run_map_reduce(self, mapreduce, mapper_func=None, reducer_func=None):
results = reducer_func(self, results)
return results

def _search_iteration(self, bucket, query, rows, start):
results = bucket.search(query, rows=rows, start=start)
return [doc["id"] for doc in results["docs"]]

def real_search(self, modelcls, query, rows=None):
rows = 1000 if rows is None else rows
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
keys = []
new_keys = self._search_iteration(bucket, query, rows, 0)
while new_keys:
keys.extend(new_keys)
new_keys = self._search_iteration(bucket, query, rows, len(keys))
return keys

def riak_enable_search(self, modelcls):
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
Expand Down
46 changes: 46 additions & 0 deletions vumi/persist/tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ def assert_mapreduce_results(self, expected_keys, mr_func, *args, **kw):
self.assertEqual(expected_keys, sorted(keys))
self.assertEqual(len(expected_keys), count)

@inlineCallbacks
def assert_search_results(self, expected_keys, func, *args, **kw):
keys = yield func(*args, **kw)
self.assertEqual(expected_keys, sorted(keys))

@Manager.calls_manager
def test_simple_search(self):
simple_model = self.manager.proxy(SimpleModel)
Expand Down Expand Up @@ -340,6 +345,47 @@ def test_simple_raw_search(self):
["one", "two"], search, 'b:abc OR b:def')
yield self.assert_mapreduce_results(["three", "two"], search, 'a:2')

@Manager.calls_manager
def test_simple_real_search(self):
simple_model = self.manager.proxy(SimpleModel)
yield simple_model.enable_search()
yield simple_model("one", a=1, b=u'abc').save()
yield simple_model("two", a=2, b=u'def').save()
yield simple_model("three", a=2, b=u'ghi').save()

search = simple_model.real_search
yield self.assert_search_results(["one"], search, 'a:1')
yield self.assert_search_results(["two"], search, 'a:2 AND b:def')
yield self.assert_search_results(
["one", "two"], search, 'b:abc OR b:def')
yield self.assert_search_results(["three", "two"], search, 'a:2')

@Manager.calls_manager
def test_big_real_search(self):
simple_model = self.manager.proxy(SimpleModel)
yield simple_model.enable_search()
keys = []
for i in range(100):
key = "xx%06d" % (i + 1)
keys.append(key)
yield simple_model(key, a=99, b=u'abc').save()
yield simple_model("yy000001", a=98, b=u'def').save()
yield simple_model("yy000002", a=98, b=u'ghi').save()

search = lambda q: simple_model.real_search(q, rows=11)
yield self.assert_search_results(keys, search, 'a:99')

@Manager.calls_manager
def test_empty_real_search(self):
simple_model = self.manager.proxy(SimpleModel)
yield simple_model.enable_search()
yield simple_model("one", a=1, b=u'abc').save()
yield simple_model("two", a=2, b=u'def').save()
yield simple_model("three", a=2, b=u'ghi').save()

search = simple_model.real_search
yield self.assert_search_results([], search, 'a:7')

@Manager.calls_manager
def test_load_all_bunches(self):
self.assertFalse(self.manager.USE_MAPREDUCE_BUNCH_LOADING)
Expand Down
18 changes: 18 additions & 0 deletions vumi/persist/txriak_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,24 @@ def map_results(raw_results):
mapreduce_done.addCallback(lambda r: reducer_func(self, r))
return mapreduce_done

def _search_iteration(self, bucket, query, rows, start):
d = deferToThread(bucket.search, query, rows=rows, start=start)
d.addCallback(lambda r: [doc["id"] for doc in r["docs"]])
return d

@inlineCallbacks
def real_search(self, modelcls, query, rows=None):
rows = 1000 if rows is None else rows
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
keys = []
new_keys = yield self._search_iteration(bucket, query, rows, 0)
while new_keys:
keys.extend(new_keys)
new_keys = yield self._search_iteration(
bucket, query, rows, len(keys))
returnValue(keys)

def riak_enable_search(self, modelcls):
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
Expand Down

0 comments on commit 30d2d5d

Please sign in to comment.