Skip to content

Commit

Permalink
Clear bucket properties when purging a manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Sep 11, 2014
1 parent ac9e675 commit f36fc5f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
6 changes: 6 additions & 0 deletions vumi/persist/riak_manager.py
Expand Up @@ -269,6 +269,11 @@ def riak_enable_search(self, modelcls):
bucket = self.client.bucket(bucket_name)
return bucket.enable_search()

def riak_search_enabled(self, modelcls):
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
return bucket.search_enabled()

def should_quote_index_values(self):
return False

Expand All @@ -279,3 +284,4 @@ def purge_all(self):
for key in bucket.get_keys():
obj = bucket.get(key)
obj.delete()
bucket.clear_properties()
18 changes: 18 additions & 0 deletions vumi/persist/tests/test_txriak_manager.py
Expand Up @@ -218,6 +218,24 @@ def test_purge_all(self):
result = yield self.manager.load(DummyModel, dummy.key)
self.assertEqual(result, None)

@Manager.calls_manager
def test_purge_all_clears_bucket_properties(self):
search_enabled = yield self.manager.riak_search_enabled(DummyModel)
self.assertEqual(search_enabled, False)

yield self.manager.riak_enable_search(DummyModel)
search_enabled = yield self.manager.riak_search_enabled(DummyModel)
self.assertEqual(search_enabled, True)

# We need at least one key in here so the bucket can be found and
# purged.
dummy = self.mkdummy("foo", {"baz": 0})
yield self.manager.store(dummy)

yield self.manager.purge_all()
search_enabled = yield self.manager.riak_search_enabled(DummyModel)
self.assertEqual(search_enabled, False)

@Manager.calls_manager
def test_json_decoding(self):
# Some versions of the riak client library use simplejson by
Expand Down
20 changes: 16 additions & 4 deletions vumi/persist/txriak_manager.py
Expand Up @@ -300,6 +300,11 @@ def riak_enable_search(self, modelcls):
bucket = self.client.bucket(bucket_name)
return deferToThread(bucket.enable_search)

def riak_search_enabled(self, modelcls):
bucket_name = self.bucket_name(modelcls)
bucket = self.client.bucket(bucket_name)
return deferToThread(bucket.search_enabled)

def should_quote_index_values(self):
return False

Expand All @@ -310,10 +315,17 @@ def delete_obj(bucket, key):
obj = bucket.get(key)
obj.delete()

deletes = []
def purge_bucket(bucket):
key_deletes = []
for key in bucket.get_keys():
key_deletes.append(deferToThread(delete_obj, bucket, key))
d = gatherResults(key_deletes)
d.addCallback(lambda _: deferToThread(bucket.clear_properties))
return d

bucket_deletes = []
buckets = yield deferToThread(self.client.get_buckets)
for bucket in buckets:
if bucket.name.startswith(self.bucket_prefix):
for key in bucket.get_keys():
deletes.append(deferToThread(delete_obj, bucket, key))
yield gatherResults(deletes)
bucket_deletes.append(purge_bucket(bucket))
yield gatherResults(bucket_deletes)

0 comments on commit f36fc5f

Please sign in to comment.